jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
changeset 26448 5853628b0e63
parent 25859 3317bb8137f4
child 32988 da3715f8eec3
--- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java	Thu Sep 04 12:23:01 2014 -0400
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java	Fri Sep 05 10:54:28 2014 +0200
@@ -297,15 +297,22 @@
     }
 
     /**
-     * Tries to set SIGNAL status unless already completed. Used by
-     * ForkJoinPool. Other variants are directly incorporated into
-     * externalAwaitDone etc.
+     * If not done, sets SIGNAL status and performs Object.wait(timeout).
+     * This task may or may not be done on exit. Ignores interrupts.
      *
-     * @return true if successful
+     * @param timeout using Object.wait conventions.
      */
-    final boolean trySetSignal() {
-        int s = status;
-        return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
+    final void internalWait(long timeout) {
+        int s;
+        if ((s = status) >= 0 && // force completer to issue notify
+            U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+            synchronized (this) {
+                if (status >= 0)
+                    try { wait(timeout); } catch (InterruptedException ie) { }
+                else
+                    notifyAll();
+            }
+        }
     }
 
     /**
@@ -313,35 +320,29 @@
      * @return status upon completion
      */
     private int externalAwaitDone() {
-        int s;
-        ForkJoinPool cp = ForkJoinPool.common;
-        if ((s = status) >= 0) {
-            if (cp != null) {
-                if (this instanceof CountedCompleter)
-                    s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
-                else if (cp.tryExternalUnpush(this))
-                    s = doExec();
-            }
-            if (s >= 0 && (s = status) >= 0) {
-                boolean interrupted = false;
-                do {
-                    if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
-                        synchronized (this) {
-                            if (status >= 0) {
-                                try {
-                                    wait();
-                                } catch (InterruptedException ie) {
-                                    interrupted = true;
-                                }
+        int s = ((this instanceof CountedCompleter) ? // try helping
+                 ForkJoinPool.common.externalHelpComplete(
+                     (CountedCompleter<?>)this, 0) :
+                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
+        if (s >= 0 && (s = status) >= 0) {
+            boolean interrupted = false;
+            do {
+                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+                    synchronized (this) {
+                        if (status >= 0) {
+                            try {
+                                wait(0L);
+                            } catch (InterruptedException ie) {
+                                interrupted = true;
                             }
-                            else
-                                notifyAll();
                         }
+                        else
+                            notifyAll();
                     }
-                } while ((s = status) >= 0);
-                if (interrupted)
-                    Thread.currentThread().interrupt();
-            }
+                }
+            } while ((s = status) >= 0);
+            if (interrupted)
+                Thread.currentThread().interrupt();
         }
         return s;
     }
@@ -351,22 +352,22 @@
      */
     private int externalInterruptibleAwaitDone() throws InterruptedException {
         int s;
-        ForkJoinPool cp = ForkJoinPool.common;
         if (Thread.interrupted())
             throw new InterruptedException();
-        if ((s = status) >= 0 && cp != null) {
-            if (this instanceof CountedCompleter)
-                cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
-            else if (cp.tryExternalUnpush(this))
-                doExec();
-        }
-        while ((s = status) >= 0) {
-            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
-                synchronized (this) {
-                    if (status >= 0)
-                        wait();
-                    else
-                        notifyAll();
+        if ((s = status) >= 0 &&
+            (s = ((this instanceof CountedCompleter) ?
+                  ForkJoinPool.common.externalHelpComplete(
+                      (CountedCompleter<?>)this, 0) :
+                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
+                  0)) >= 0) {
+            while ((s = status) >= 0) {
+                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+                    synchronized (this) {
+                        if (status >= 0)
+                            wait(0L);
+                        else
+                            notifyAll();
+                    }
                 }
             }
         }
@@ -386,7 +387,7 @@
             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
             (w = (wt = (ForkJoinWorkerThread)t).workQueue).
             tryUnpush(this) && (s = doExec()) < 0 ? s :
-            wt.pool.awaitJoin(w, this) :
+            wt.pool.awaitJoin(w, this, 0L) :
             externalAwaitDone();
     }
 
@@ -399,7 +400,8 @@
         int s; Thread t; ForkJoinWorkerThread wt;
         return (s = doExec()) < 0 ? s :
             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
-            (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+            (wt = (ForkJoinWorkerThread)t).pool.
+            awaitJoin(wt.workQueue, this, 0L) :
             externalAwaitDone();
     }
 
@@ -577,7 +579,7 @@
         Throwable ex;
         if (e == null || (ex = e.ex) == null)
             return null;
-        if (false && e.thrower != Thread.currentThread().getId()) {
+        if (e.thrower != Thread.currentThread().getId()) {
             Class<? extends Throwable> ec = ex.getClass();
             try {
                 Constructor<?> noArgCtor = null;
@@ -587,13 +589,17 @@
                     Class<?>[] ps = c.getParameterTypes();
                     if (ps.length == 0)
                         noArgCtor = c;
-                    else if (ps.length == 1 && ps[0] == Throwable.class)
-                        return (Throwable)(c.newInstance(ex));
+                    else if (ps.length == 1 && ps[0] == Throwable.class) {
+                        Throwable wx = (Throwable)c.newInstance(ex);
+                        return (wx == null) ? ex : wx;
+                    }
                 }
                 if (noArgCtor != null) {
                     Throwable wx = (Throwable)(noArgCtor.newInstance());
-                    wx.initCause(ex);
-                    return wx;
+                    if (wx != null) {
+                        wx.initCause(ex);
+                        return wx;
+                    }
                 }
             } catch (Exception ignore) {
             }
@@ -1017,67 +1023,40 @@
      */
     public final V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
+        int s;
+        long nanos = unit.toNanos(timeout);
         if (Thread.interrupted())
             throw new InterruptedException();
-        // Messy in part because we measure in nanosecs, but wait in millisecs
-        int s; long ms;
-        long ns = unit.toNanos(timeout);
-        ForkJoinPool cp;
-        if ((s = status) >= 0 && ns > 0L) {
-            long deadline = System.nanoTime() + ns;
-            ForkJoinPool p = null;
-            ForkJoinPool.WorkQueue w = null;
+        if ((s = status) >= 0 && nanos > 0L) {
+            long d = System.nanoTime() + nanos;
+            long deadline = (d == 0L) ? 1L : d; // avoid 0
             Thread t = Thread.currentThread();
             if (t instanceof ForkJoinWorkerThread) {
                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
-                p = wt.pool;
-                w = wt.workQueue;
-                p.helpJoinOnce(w, this); // no retries on failure
-            }
-            else if ((cp = ForkJoinPool.common) != null) {
-                if (this instanceof CountedCompleter)
-                    cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
-                else if (cp.tryExternalUnpush(this))
-                    doExec();
+                s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
             }
-            boolean canBlock = false;
-            boolean interrupted = false;
-            try {
-                while ((s = status) >= 0) {
-                    if (w != null && w.qlock < 0)
-                        cancelIgnoringExceptions(this);
-                    else if (!canBlock) {
-                        if (p == null || p.tryCompensate(p.ctl))
-                            canBlock = true;
-                    }
-                    else {
-                        if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
-                            U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
-                            synchronized (this) {
-                                if (status >= 0) {
-                                    try {
-                                        wait(ms);
-                                    } catch (InterruptedException ie) {
-                                        if (p == null)
-                                            interrupted = true;
-                                    }
-                                }
-                                else
-                                    notifyAll();
-                            }
+            else if ((s = ((this instanceof CountedCompleter) ?
+                           ForkJoinPool.common.externalHelpComplete(
+                               (CountedCompleter<?>)this, 0) :
+                           ForkJoinPool.common.tryExternalUnpush(this) ?
+                           doExec() : 0)) >= 0) {
+                long ns, ms; // measure in nanosecs, but wait in millisecs
+                while ((s = status) >= 0 &&
+                       (ns = deadline - System.nanoTime()) > 0L) {
+                    if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
+                        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+                        synchronized (this) {
+                            if (status >= 0)
+                                wait(ms); // OK to throw InterruptedException
+                            else
+                                notifyAll();
                         }
-                        if ((s = status) < 0 || interrupted ||
-                            (ns = deadline - System.nanoTime()) <= 0L)
-                            break;
                     }
                 }
-            } finally {
-                if (p != null && canBlock)
-                    p.incrementActiveCount();
             }
-            if (interrupted)
-                throw new InterruptedException();
         }
+        if (s >= 0)
+            s = status;
         if ((s &= DONE_MASK) != NORMAL) {
             Throwable ex;
             if (s == CANCELLED)