jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
changeset 8765 dfc2a131d08a
parent 7976 f273c0d04215
child 9242 ef138d47df58
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java	Tue Mar 08 17:52:32 2011 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java	Tue Mar 08 18:16:14 2011 +0000
@@ -41,7 +41,8 @@
 import java.util.List;
 import java.util.RandomAccess;
 import java.util.Map;
-import java.util.WeakHashMap;
+import java.lang.ref.WeakReference;
+import java.lang.ref.ReferenceQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -52,6 +53,8 @@
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+import java.lang.reflect.Constructor;
 
 /**
  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
@@ -95,7 +98,11 @@
  * rethrown to callers attempting to join them. These exceptions may
  * additionally include {@link RejectedExecutionException} stemming
  * from internal resource exhaustion, such as failure to allocate
- * internal task queues.
+ * internal task queues. Rethrown exceptions behave in the same way as
+ * regular exceptions, but, when possible, contain stack traces (as
+ * displayed for example using {@code ex.printStackTrace()}) of both
+ * the thread that initiated the computation as well as the thread
+ * actually encountering the exception; minimally only the latter.
  *
  * <p>The primary method for awaiting completion and extracting
  * results of a task is {@link #join}, but there are several variants:
@@ -192,8 +199,7 @@
      * status maintenance (2) execution and awaiting completion (3)
      * user-level methods that additionally report results. This is
      * sometimes hard to see because this file orders exported methods
-     * in a way that flows well in javadocs. In particular, most
-     * join mechanics are in method quietlyJoin, below.
+     * in a way that flows well in javadocs.
      */
 
     /*
@@ -215,91 +221,67 @@
 
     /** The run status of this task */
     volatile int status; // accessed directly by pool and workers
-
     private static final int NORMAL      = -1;
     private static final int CANCELLED   = -2;
     private static final int EXCEPTIONAL = -3;
     private static final int SIGNAL      =  1;
 
     /**
-     * Table of exceptions thrown by tasks, to enable reporting by
-     * callers. Because exceptions are rare, we don't directly keep
-     * them with task objects, but instead use a weak ref table.  Note
-     * that cancellation exceptions don't appear in the table, but are
-     * instead recorded as status values.
-     * TODO: Use ConcurrentReferenceHashMap
-     */
-    static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
-        Collections.synchronizedMap
-        (new WeakHashMap<ForkJoinTask<?>, Throwable>());
-
-    // Maintaining completion status
-
-    /**
      * Marks completion and wakes up threads waiting to join this task,
      * also clearing signal request bits.
      *
      * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
+     * @return completion status on exit
      */
-    private void setCompletion(int completion) {
-        int s;
-        while ((s = status) >= 0) {
+    private int setCompletion(int completion) {
+        for (int s;;) {
+            if ((s = status) < 0)
+                return s;
             if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
                 if (s != 0)
                     synchronized (this) { notifyAll(); }
-                break;
+                return completion;
             }
         }
     }
 
     /**
-     * Records exception and sets exceptional completion.
+     * Tries to block a worker thread until completed or timed out.
+     * Uses Object.wait time argument conventions.
+     * May fail on contention or interrupt.
      *
-     * @return status on exit
+     * @param millis if > 0, wait time.
      */
-    private void setExceptionalCompletion(Throwable rex) {
-        exceptionMap.put(this, rex);
-        setCompletion(EXCEPTIONAL);
-    }
-
-    /**
-     * Blocks a worker thread until completed or timed out.  Called
-     * only by pool.
-     */
-    final void internalAwaitDone(long millis, int nanos) {
-        int s = status;
-        if ((s == 0 &&
-             UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
-            s > 0)  {
-            try {     // the odd construction reduces lock bias effects
+    final void tryAwaitDone(long millis) {
+        int s;
+        try {
+            if (((s = status) > 0 ||
+                 (s == 0 &&
+                  UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
+                status > 0) {
                 synchronized (this) {
                     if (status > 0)
-                        wait(millis, nanos);
-                    else
-                        notifyAll();
+                        wait(millis);
                 }
-            } catch (InterruptedException ie) {
-                cancelIfTerminating();
             }
+        } catch (InterruptedException ie) {
+            // caller must check termination
         }
     }
 
     /**
      * Blocks a non-worker-thread until completion.
+     * @return status upon completion
      */
-    private void externalAwaitDone() {
-        if (status >= 0) {
+    private int externalAwaitDone() {
+        int s;
+        if ((s = status) >= 0) {
             boolean interrupted = false;
             synchronized (this) {
-                for (;;) {
-                    int s = status;
+                while ((s = status) >= 0) {
                     if (s == 0)
                         UNSAFE.compareAndSwapInt(this, statusOffset,
                                                  0, SIGNAL);
-                    else if (s < 0) {
-                        notifyAll();
-                        break;
-                    }
                     else {
                         try {
                             wait();
@@ -312,53 +294,308 @@
             if (interrupted)
                 Thread.currentThread().interrupt();
         }
+        return s;
     }
 
     /**
      * Blocks a non-worker-thread until completion or interruption or timeout.
      */
-    private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+    private int externalInterruptibleAwaitDone(long millis)
         throws InterruptedException {
+        int s;
         if (Thread.interrupted())
             throw new InterruptedException();
-        if (status >= 0) {
-            long startTime = timed ? System.nanoTime() : 0L;
+        if ((s = status) >= 0) {
             synchronized (this) {
-                for (;;) {
-                    long nt;
-                    int s = status;
+                while ((s = status) >= 0) {
                     if (s == 0)
                         UNSAFE.compareAndSwapInt(this, statusOffset,
                                                  0, SIGNAL);
-                    else if (s < 0) {
-                        notifyAll();
+                    else {
+                        wait(millis);
+                        if (millis > 0L)
+                            break;
+                    }
+                }
+            }
+        }
+        return s;
+    }
+
+    /**
+     * Primary execution method for stolen tasks. Unless done, calls
+     * exec and records status if completed, but doesn't wait for
+     * completion otherwise.
+     */
+    final void doExec() {
+        if (status >= 0) {
+            boolean completed;
+            try {
+                completed = exec();
+            } catch (Throwable rex) {
+                setExceptionalCompletion(rex);
+                return;
+            }
+            if (completed)
+                setCompletion(NORMAL); // must be outside try block
+        }
+    }
+
+    /**
+     * Primary mechanics for join, get, quietlyJoin.
+     * @return status upon completion
+     */
+    private int doJoin() {
+        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
+        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+            if ((s = status) < 0)
+                return s;
+            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
+                try {
+                    completed = exec();
+                } catch (Throwable rex) {
+                    return setExceptionalCompletion(rex);
+                }
+                if (completed)
+                    return setCompletion(NORMAL);
+            }
+            return w.joinTask(this);
+        }
+        else
+            return externalAwaitDone();
+    }
+
+    /**
+     * Primary mechanics for invoke, quietlyInvoke.
+     * @return status upon completion
+     */
+    private int doInvoke() {
+        int s; boolean completed;
+        if ((s = status) < 0)
+            return s;
+        try {
+            completed = exec();
+        } catch (Throwable rex) {
+            return setExceptionalCompletion(rex);
+        }
+        if (completed)
+            return setCompletion(NORMAL);
+        else
+            return doJoin();
+    }
+
+    // Exception table support
+
+    /**
+     * Table of exceptions thrown by tasks, to enable reporting by
+     * callers. Because exceptions are rare, we don't directly keep
+     * them with task objects, but instead use a weak ref table.  Note
+     * that cancellation exceptions don't appear in the table, but are
+     * instead recorded as status values.
+     *
+     * Note: These statics are initialized below in static block.
+     */
+    private static final ExceptionNode[] exceptionTable;
+    private static final ReentrantLock exceptionTableLock;
+    private static final ReferenceQueue<Object> exceptionTableRefQueue;
+
+    /**
+     * Fixed capacity for exceptionTable.
+     */
+    private static final int EXCEPTION_MAP_CAPACITY = 32;
+
+    /**
+     * Key-value nodes for exception table.  The chained hash table
+     * uses identity comparisons, full locking, and weak references
+     * for keys. The table has a fixed capacity because it only
+     * maintains task exceptions long enough for joiners to access
+     * them, so should never become very large for sustained
+     * periods. However, since we do not know when the last joiner
+     * completes, we must use weak references and expunge them. We do
+     * so on each operation (hence full locking). Also, some thread in
+     * any ForkJoinPool will call helpExpungeStaleExceptions when its
+     * pool becomes isQuiescent.
+     */
+    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{
+        final Throwable ex;
+        ExceptionNode next;
+        final long thrower;  // use id not ref to avoid weak cycles
+        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
+            super(task, exceptionTableRefQueue);
+            this.ex = ex;
+            this.next = next;
+            this.thrower = Thread.currentThread().getId();
+        }
+    }
+
+    /**
+     * Records exception and sets exceptional completion.
+     *
+     * @return status on exit
+     */
+    private int setExceptionalCompletion(Throwable ex) {
+        int h = System.identityHashCode(this);
+        final ReentrantLock lock = exceptionTableLock;
+        lock.lock();
+        try {
+            expungeStaleExceptions();
+            ExceptionNode[] t = exceptionTable;
+            int i = h & (t.length - 1);
+            for (ExceptionNode e = t[i]; ; e = e.next) {
+                if (e == null) {
+                    t[i] = new ExceptionNode(this, ex, t[i]);
+                    break;
+                }
+                if (e.get() == this) // already present
+                    break;
+            }
+        } finally {
+            lock.unlock();
+        }
+        return setCompletion(EXCEPTIONAL);
+    }
+
+    /**
+     * Removes exception node and clears status
+     */
+    private void clearExceptionalCompletion() {
+        int h = System.identityHashCode(this);
+        final ReentrantLock lock = exceptionTableLock;
+        lock.lock();
+        try {
+            ExceptionNode[] t = exceptionTable;
+            int i = h & (t.length - 1);
+            ExceptionNode e = t[i];
+            ExceptionNode pred = null;
+            while (e != null) {
+                ExceptionNode next = e.next;
+                if (e.get() == this) {
+                    if (pred == null)
+                        t[i] = next;
+                    else
+                        pred.next = next;
+                    break;
+                }
+                pred = e;
+                e = next;
+            }
+            expungeStaleExceptions();
+            status = 0;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns a rethrowable exception for the given task, if
+     * available. To provide accurate stack traces, if the exception
+     * was not thrown by the current thread, we try to create a new
+     * exception of the same type as the one thrown, but with the
+     * recorded exception as its cause. If there is no such
+     * constructor, we instead try to use a no-arg constructor,
+     * followed by initCause, to the same effect. If none of these
+     * apply, or any fail due to other exceptions, we return the
+     * recorded exception, which is still correct, although it may
+     * contain a misleading stack trace.
+     *
+     * @return the exception, or null if none
+     */
+    private Throwable getThrowableException() {
+        if (status != EXCEPTIONAL)
+            return null;
+        int h = System.identityHashCode(this);
+        ExceptionNode e;
+        final ReentrantLock lock = exceptionTableLock;
+        lock.lock();
+        try {
+            expungeStaleExceptions();
+            ExceptionNode[] t = exceptionTable;
+            e = t[h & (t.length - 1)];
+            while (e != null && e.get() != this)
+                e = e.next;
+        } finally {
+            lock.unlock();
+        }
+        Throwable ex;
+        if (e == null || (ex = e.ex) == null)
+            return null;
+        if (e.thrower != Thread.currentThread().getId()) {
+            Class ec = ex.getClass();
+            try {
+                Constructor<?> noArgCtor = null;
+                Constructor<?>[] cs = ec.getConstructors();// public ctors only
+                for (int i = 0; i < cs.length; ++i) {
+                    Constructor<?> c = cs[i];
+                    Class<?>[] ps = c.getParameterTypes();
+                    if (ps.length == 0)
+                        noArgCtor = c;
+                    else if (ps.length == 1 && ps[0] == Throwable.class)
+                        return (Throwable)(c.newInstance(ex));
+                }
+                if (noArgCtor != null) {
+                    Throwable wx = (Throwable)(noArgCtor.newInstance());
+                    wx.initCause(ex);
+                    return wx;
+                }
+            } catch (Exception ignore) {
+            }
+        }
+        return ex;
+    }
+
+    /**
+     * Poll stale refs and remove them. Call only while holding lock.
+     */
+    private static void expungeStaleExceptions() {
+        for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
+            if (x instanceof ExceptionNode) {
+                ForkJoinTask<?> key = ((ExceptionNode)x).get();
+                ExceptionNode[] t = exceptionTable;
+                int i = System.identityHashCode(key) & (t.length - 1);
+                ExceptionNode e = t[i];
+                ExceptionNode pred = null;
+                while (e != null) {
+                    ExceptionNode next = e.next;
+                    if (e == x) {
+                        if (pred == null)
+                            t[i] = next;
+                        else
+                            pred.next = next;
                         break;
                     }
-                    else if (!timed)
-                        wait();
-                    else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
-                        wait(nt / 1000000, (int)(nt % 1000000));
-                    else
-                        break;
+                    pred = e;
+                    e = next;
                 }
             }
         }
     }
 
     /**
-     * Unless done, calls exec and records status if completed, but
-     * doesn't wait for completion otherwise. Primary execution method
-     * for ForkJoinWorkerThread.
+     * If lock is available, poll stale refs and remove them.
+     * Called from ForkJoinPool when pools become quiescent.
      */
-    final void quietlyExec() {
-        try {
-            if (status < 0 || !exec())
-                return;
-        } catch (Throwable rex) {
-            setExceptionalCompletion(rex);
-            return;
+    static final void helpExpungeStaleExceptions() {
+        final ReentrantLock lock = exceptionTableLock;
+        if (lock.tryLock()) {
+            try {
+                expungeStaleExceptions();
+            } finally {
+                lock.unlock();
+            }
         }
-        setCompletion(NORMAL); // must be outside try block
+    }
+
+    /**
+     * Report the result of invoke or join; called only upon
+     * non-normal return of internal versions.
+     */
+    private V reportResult() {
+        int s; Throwable ex;
+        if ((s = status) == CANCELLED)
+            throw new CancellationException();
+        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
+            UNSAFE.throwException(ex);
+        return getRawResult();
     }
 
     // public methods
@@ -399,11 +636,10 @@
      * @return the computed result
      */
     public final V join() {
-        quietlyJoin();
-        Throwable ex;
-        if (status < NORMAL && (ex = getException()) != null)
-            UNSAFE.throwException(ex);
-        return getRawResult();
+        if (doJoin() != NORMAL)
+            return reportResult();
+        else
+            return getRawResult();
     }
 
     /**
@@ -415,11 +651,10 @@
      * @return the computed result
      */
     public final V invoke() {
-        quietlyInvoke();
-        Throwable ex;
-        if (status < NORMAL && (ex = getException()) != null)
-            UNSAFE.throwException(ex);
-        return getRawResult();
+        if (doInvoke() != NORMAL)
+            return reportResult();
+        else
+            return getRawResult();
     }
 
     /**
@@ -483,22 +718,16 @@
             }
             else if (i != 0)
                 t.fork();
-            else {
-                t.quietlyInvoke();
-                if (ex == null && t.status < NORMAL)
-                    ex = t.getException();
-            }
+            else if (t.doInvoke() < NORMAL && ex == null)
+                ex = t.getException();
         }
         for (int i = 1; i <= last; ++i) {
             ForkJoinTask<?> t = tasks[i];
             if (t != null) {
                 if (ex != null)
                     t.cancel(false);
-                else {
-                    t.quietlyJoin();
-                    if (ex == null && t.status < NORMAL)
-                        ex = t.getException();
-                }
+                else if (t.doJoin() < NORMAL && ex == null)
+                    ex = t.getException();
             }
         }
         if (ex != null)
@@ -546,22 +775,16 @@
             }
             else if (i != 0)
                 t.fork();
-            else {
-                t.quietlyInvoke();
-                if (ex == null && t.status < NORMAL)
-                    ex = t.getException();
-            }
+            else if (t.doInvoke() < NORMAL && ex == null)
+                ex = t.getException();
         }
         for (int i = 1; i <= last; ++i) {
             ForkJoinTask<?> t = ts.get(i);
             if (t != null) {
                 if (ex != null)
                     t.cancel(false);
-                else {
-                    t.quietlyJoin();
-                    if (ex == null && t.status < NORMAL)
-                        ex = t.getException();
-                }
+                else if (t.doJoin() < NORMAL && ex == null)
+                    ex = t.getException();
             }
         }
         if (ex != null)
@@ -597,8 +820,7 @@
      * @return {@code true} if this task is now cancelled
      */
     public boolean cancel(boolean mayInterruptIfRunning) {
-        setCompletion(CANCELLED);
-        return status == CANCELLED;
+        return setCompletion(CANCELLED) == CANCELLED;
     }
 
     /**
@@ -614,21 +836,6 @@
         }
     }
 
-    /**
-     * Cancels if current thread is a terminating worker thread,
-     * ignoring any exceptions thrown by cancel.
-     */
-    final void cancelIfTerminating() {
-        Thread t = Thread.currentThread();
-        if ((t instanceof ForkJoinWorkerThread) &&
-            ((ForkJoinWorkerThread) t).isTerminating()) {
-            try {
-                cancel(false);
-            } catch (Throwable ignore) {
-            }
-        }
-    }
-
     public final boolean isDone() {
         return status < 0;
     }
@@ -668,7 +875,7 @@
         int s = status;
         return ((s >= NORMAL)    ? null :
                 (s == CANCELLED) ? new CancellationException() :
-                exceptionMap.get(this));
+                getThrowableException());
     }
 
     /**
@@ -726,19 +933,13 @@
      * member of a ForkJoinPool and was interrupted while waiting
      */
     public final V get() throws InterruptedException, ExecutionException {
-        Thread t = Thread.currentThread();
-        if (t instanceof ForkJoinWorkerThread)
-            quietlyJoin();
-        else
-            externalInterruptibleAwaitDone(false, 0L);
-        int s = status;
-        if (s != NORMAL) {
-            Throwable ex;
-            if (s == CANCELLED)
-                throw new CancellationException();
-            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
-                throw new ExecutionException(ex);
-        }
+        int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+            doJoin() : externalInterruptibleAwaitDone(0L);
+        Throwable ex;
+        if (s == CANCELLED)
+            throw new CancellationException();
+        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
+            throw new ExecutionException(ex);
         return getRawResult();
     }
 
@@ -758,20 +959,39 @@
      */
     public final V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-        long nanos = unit.toNanos(timeout);
         Thread t = Thread.currentThread();
-        if (t instanceof ForkJoinWorkerThread)
-            ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
-        else
-            externalInterruptibleAwaitDone(true, nanos);
+        if (t instanceof ForkJoinWorkerThread) {
+            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
+            long nanos = unit.toNanos(timeout);
+            if (status >= 0) {
+                boolean completed = false;
+                if (w.unpushTask(this)) {
+                    try {
+                        completed = exec();
+                    } catch (Throwable rex) {
+                        setExceptionalCompletion(rex);
+                    }
+                }
+                if (completed)
+                    setCompletion(NORMAL);
+                else if (status >= 0 && nanos > 0)
+                    w.pool.timedAwaitJoin(this, nanos);
+            }
+        }
+        else {
+            long millis = unit.toMillis(timeout);
+            if (millis > 0)
+                externalInterruptibleAwaitDone(millis);
+        }
         int s = status;
         if (s != NORMAL) {
             Throwable ex;
             if (s == CANCELLED)
                 throw new CancellationException();
-            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+            if (s != EXCEPTIONAL)
+                throw new TimeoutException();
+            if ((ex = getThrowableException()) != null)
                 throw new ExecutionException(ex);
-            throw new TimeoutException();
         }
         return getRawResult();
     }
@@ -783,28 +1003,7 @@
      * known to have aborted.
      */
     public final void quietlyJoin() {
-        Thread t;
-        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
-            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
-            if (status >= 0) {
-                if (w.unpushTask(this)) {
-                    boolean completed;
-                    try {
-                        completed = exec();
-                    } catch (Throwable rex) {
-                        setExceptionalCompletion(rex);
-                        return;
-                    }
-                    if (completed) {
-                        setCompletion(NORMAL);
-                        return;
-                    }
-                }
-                w.joinTask(this, false, 0L);
-            }
-        }
-        else
-            externalAwaitDone();
+        doJoin();
     }
 
     /**
@@ -813,19 +1012,7 @@
      * exception.
      */
     public final void quietlyInvoke() {
-        if (status >= 0) {
-            boolean completed;
-            try {
-                completed = exec();
-            } catch (Throwable rex) {
-                setExceptionalCompletion(rex);
-                return;
-            }
-            if (completed)
-                setCompletion(NORMAL);
-            else
-                quietlyJoin();
-        }
+        doInvoke();
     }
 
     /**
@@ -864,8 +1051,9 @@
      */
     public void reinitialize() {
         if (status == EXCEPTIONAL)
-            exceptionMap.remove(this);
-        status = 0;
+            clearExceptionalCompletion();
+        else
+            status = 0;
     }
 
     /**
@@ -1176,23 +1364,23 @@
         s.defaultReadObject();
         Object ex = s.readObject();
         if (ex != null)
-            setExceptionalCompletion((Throwable) ex);
+            setExceptionalCompletion((Throwable)ex);
     }
 
     // Unsafe mechanics
-
-    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
-    private static final long statusOffset =
-        objectFieldOffset("status", ForkJoinTask.class);
-
-    private static long objectFieldOffset(String field, Class<?> klazz) {
+    private static final sun.misc.Unsafe UNSAFE;
+    private static final long statusOffset;
+    static {
+        exceptionTableLock = new ReentrantLock();
+        exceptionTableRefQueue = new ReferenceQueue<Object>();
+        exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
         try {
-            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
-        } catch (NoSuchFieldException e) {
-            // Convert Exception to corresponding Error
-            NoSuchFieldError error = new NoSuchFieldError(field);
-            error.initCause(e);
-            throw error;
+            UNSAFE = sun.misc.Unsafe.getUnsafe();
+            statusOffset = UNSAFE.objectFieldOffset
+                (ForkJoinTask.class.getDeclaredField("status"));
+        } catch (Exception e) {
+            throw new Error(e);
         }
     }
+
 }