8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08
authordl
Fri, 08 Dec 2017 15:30:53 -0800
changeset 48233 dd5157f363ab
parent 48232 bf476235671a
child 48234 e7342e1becb4
8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08 Reviewed-by: martin, psandoz, chegar
src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java
--- a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java	Fri Dec 08 15:26:56 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java	Fri Dec 08 15:30:53 2017 -0800
@@ -735,7 +735,7 @@
         CountedCompleter<?> a = this, s = a;
         while (a.onExceptionalCompletion(ex, s) &&
                (a = (s = a).completer) != null && a.status >= 0 &&
-               a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
+               isExceptionalStatus(a.recordExceptionalCompletion(ex)))
             ;
     }
 
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java	Fri Dec 08 15:26:56 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java	Fri Dec 08 15:30:53 2017 -0800
@@ -219,52 +219,59 @@
      * methods in a way that flows well in javadocs.
      */
 
-    /*
+    /**
      * The status field holds run control status bits packed into a
-     * single int to minimize footprint and to ensure atomicity (via
-     * CAS).  Status is initially zero, and takes on nonnegative
-     * values until completed, upon which status (anded with
-     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
-     * undergoing blocking waits by other threads have the SIGNAL bit
-     * set.  Completion of a stolen task with SIGNAL set awakens any
-     * waiters via notifyAll. Even though suboptimal for some
-     * purposes, we use basic builtin wait/notify to take advantage of
-     * "monitor inflation" in JVMs that we would otherwise need to
-     * emulate to avoid adding further per-task bookkeeping overhead.
-     * We want these monitors to be "fat", i.e., not use biasing or
-     * thin-lock techniques, so use some odd coding idioms that tend
-     * to avoid them, mainly by arranging that every synchronized
-     * block performs a wait, notifyAll or both.
+     * single int to ensure atomicity.  Status is initially zero, and
+     * takes on nonnegative values until completed, upon which it
+     * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
+     * exceptional) and THROWN (in which case an exception has been
+     * stored). Tasks with dependent blocked waiting joiners have the
+     * SIGNAL bit set.  Completion of a task with SIGNAL set awakens
+     * any waiters via notifyAll. (Waiters also help signal others
+     * upon completion.)
      *
      * These control bits occupy only (some of) the upper half (16
      * bits) of status field. The lower bits are used for user-defined
      * tags.
      */
+    volatile int status; // accessed directly by pool and workers
 
-    /** The run status of this task */
-    volatile int status; // accessed directly by pool and workers
-    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
-    static final int NORMAL      = 0xf0000000;  // must be negative
-    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
-    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
-    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
-    static final int SMASK       = 0x0000ffff;  // short bits for tags
+    private static final int DONE     = 1 << 31; // must be negative
+    private static final int ABNORMAL = 1 << 18; // set atomically with DONE
+    private static final int THROWN   = 1 << 17; // set atomically with ABNORMAL
+    private static final int SIGNAL   = 1 << 16; // true if joiner waiting
+    private static final int SMASK    = 0xffff;  // short bits for tags
+
+    static boolean isExceptionalStatus(int s) {  // needed by subclasses
+        return (s & THROWN) != 0;
+    }
 
     /**
-     * Marks completion and wakes up threads waiting to join this
-     * task.
+     * Sets DONE status and wakes up threads waiting to join this task.
      *
-     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
-     * @return completion status on exit
+     * @return status on exit
      */
-    private int setCompletion(int completion) {
-        for (int s;;) {
+    private int setDone() {
+        int s;
+        if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
+            synchronized (this) { notifyAll(); }
+        return s | DONE;
+    }
+
+    /**
+     * Marks cancelled or exceptional completion unless already done.
+     *
+     * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
+     * @return status on exit
+     */
+    private int abnormalCompletion(int completion) {
+        for (int s, ns;;) {
             if ((s = status) < 0)
                 return s;
-            if (STATUS.compareAndSet(this, s, s | completion)) {
-                if ((s >>> 16) != 0)
+            else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
+                if ((s & SIGNAL) != 0)
                     synchronized (this) { notifyAll(); }
-                return completion;
+                return ns;
             }
         }
     }
@@ -282,10 +289,11 @@
             try {
                 completed = exec();
             } catch (Throwable rex) {
-                return setExceptionalCompletion(rex);
+                completed = false;
+                s = setExceptionalCompletion(rex);
             }
             if (completed)
-                s = setCompletion(NORMAL);
+                s = setDone();
         }
         return s;
     }
@@ -297,9 +305,7 @@
      * @param timeout using Object.wait conventions.
      */
     final void internalWait(long timeout) {
-        int s;
-        if ((s = status) >= 0 && // force completer to issue notify
-            STATUS.compareAndSet(this, s, s | SIGNAL)) {
+        if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
             synchronized (this) {
                 if (status >= 0)
                     try { wait(timeout); } catch (InterruptedException ie) { }
@@ -314,27 +320,24 @@
      * @return status upon completion
      */
     private int externalAwaitDone() {
-        int s = ((this instanceof CountedCompleter) ? // try helping
-                 ForkJoinPool.common.externalHelpComplete(
-                     (CountedCompleter<?>)this, 0) :
-                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
-        if (s >= 0 && (s = status) >= 0) {
+        int s = tryExternalHelp();
+        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
             boolean interrupted = false;
-            do {
-                if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
-                    synchronized (this) {
-                        if (status >= 0) {
-                            try {
-                                wait(0L);
-                            } catch (InterruptedException ie) {
-                                interrupted = true;
-                            }
+            synchronized (this) {
+                for (;;) {
+                    if ((s = status) >= 0) {
+                        try {
+                            wait(0L);
+                        } catch (InterruptedException ie) {
+                            interrupted = true;
                         }
-                        else
-                            notifyAll();
+                    }
+                    else {
+                        notifyAll();
+                        break;
                     }
                 }
-            } while ((s = status) >= 0);
+            }
             if (interrupted)
                 Thread.currentThread().interrupt();
         }
@@ -345,30 +348,40 @@
      * Blocks a non-worker-thread until completion or interruption.
      */
     private int externalInterruptibleAwaitDone() throws InterruptedException {
-        int s;
-        if (Thread.interrupted())
-            throw new InterruptedException();
-        if ((s = status) >= 0 &&
-            (s = ((this instanceof CountedCompleter) ?
-                  ForkJoinPool.common.externalHelpComplete(
-                      (CountedCompleter<?>)this, 0) :
-                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
-                  0)) >= 0) {
-            while ((s = status) >= 0) {
-                if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
-                    synchronized (this) {
-                        if (status >= 0)
-                            wait(0L);
-                        else
-                            notifyAll();
+        int s = tryExternalHelp();
+        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
+            synchronized (this) {
+                for (;;) {
+                    if ((s = status) >= 0)
+                        wait(0L);
+                    else {
+                        notifyAll();
+                        break;
                     }
                 }
             }
         }
+        else if (Thread.interrupted())
+            throw new InterruptedException();
         return s;
     }
 
     /**
+     * Tries to help with tasks allowed for external callers.
+     *
+     * @return current status
+     */
+    private int tryExternalHelp() {
+        int s;
+        return ((s = status) < 0 ? s:
+                (this instanceof CountedCompleter) ?
+                ForkJoinPool.common.externalHelpComplete(
+                    (CountedCompleter<?>)this, 0) :
+                ForkJoinPool.common.tryExternalUnpush(this) ?
+                doExec() : 0);
+    }
+
+    /**
      * Implementation for join, get, quietlyJoin. Directly handles
      * only cases of already-completed, external wait, and
      * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
@@ -475,7 +488,7 @@
             } finally {
                 lock.unlock();
             }
-            s = setCompletion(EXCEPTIONAL);
+            s = abnormalCompletion(DONE | ABNORMAL | THROWN);
         }
         return s;
     }
@@ -487,7 +500,7 @@
      */
     private int setExceptionalCompletion(Throwable ex) {
         int s = recordExceptionalCompletion(ex);
-        if ((s & DONE_MASK) == EXCEPTIONAL)
+        if ((s & THROWN) != 0)
             internalPropagateException(ex);
         return s;
     }
@@ -662,10 +675,8 @@
      * Throws exception, if any, associated with the given status.
      */
     private void reportException(int s) {
-        if (s == CANCELLED)
-            throw new CancellationException();
-        if (s == EXCEPTIONAL)
-            rethrow(getThrowableException());
+        rethrow((s & THROWN) != 0 ? getThrowableException() :
+                new CancellationException());
     }
 
     // public methods
@@ -707,7 +718,7 @@
      */
     public final V join() {
         int s;
-        if ((s = doJoin() & DONE_MASK) != NORMAL)
+        if (((s = doJoin()) & ABNORMAL) != 0)
             reportException(s);
         return getRawResult();
     }
@@ -722,7 +733,7 @@
      */
     public final V invoke() {
         int s;
-        if ((s = doInvoke() & DONE_MASK) != NORMAL)
+        if (((s = doInvoke()) & ABNORMAL) != 0)
             reportException(s);
         return getRawResult();
     }
@@ -747,9 +758,9 @@
     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
         int s1, s2;
         t2.fork();
-        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
+        if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
             t1.reportException(s1);
-        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
+        if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
             t2.reportException(s2);
     }
 
@@ -779,7 +790,7 @@
             }
             else if (i != 0)
                 t.fork();
-            else if (t.doInvoke() < NORMAL && ex == null)
+            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
                 ex = t.getException();
         }
         for (int i = 1; i <= last; ++i) {
@@ -787,7 +798,7 @@
             if (t != null) {
                 if (ex != null)
                     t.cancel(false);
-                else if (t.doJoin() < NORMAL)
+                else if ((t.doJoin() & ABNORMAL) != 0)
                     ex = t.getException();
             }
         }
@@ -831,7 +842,7 @@
             }
             else if (i != 0)
                 t.fork();
-            else if (t.doInvoke() < NORMAL && ex == null)
+            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
                 ex = t.getException();
         }
         for (int i = 1; i <= last; ++i) {
@@ -839,7 +850,7 @@
             if (t != null) {
                 if (ex != null)
                     t.cancel(false);
-                else if (t.doJoin() < NORMAL)
+                else if ((t.doJoin() & ABNORMAL) != 0)
                     ex = t.getException();
             }
         }
@@ -876,7 +887,8 @@
      * @return {@code true} if this task is now cancelled
      */
     public boolean cancel(boolean mayInterruptIfRunning) {
-        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
+        int s = abnormalCompletion(DONE | ABNORMAL);
+        return (s & (ABNORMAL | THROWN)) == ABNORMAL;
     }
 
     public final boolean isDone() {
@@ -884,7 +896,7 @@
     }
 
     public final boolean isCancelled() {
-        return (status & DONE_MASK) == CANCELLED;
+        return (status & (ABNORMAL | THROWN)) == ABNORMAL;
     }
 
     /**
@@ -893,7 +905,7 @@
      * @return {@code true} if this task threw an exception or was cancelled
      */
     public final boolean isCompletedAbnormally() {
-        return status < NORMAL;
+        return (status & ABNORMAL) != 0;
     }
 
     /**
@@ -904,7 +916,7 @@
      * exception and was not cancelled
      */
     public final boolean isCompletedNormally() {
-        return (status & DONE_MASK) == NORMAL;
+        return (status & (DONE | ABNORMAL)) == DONE;
     }
 
     /**
@@ -915,9 +927,9 @@
      * @return the exception, or {@code null} if none
      */
     public final Throwable getException() {
-        int s = status & DONE_MASK;
-        return ((s >= NORMAL)    ? null :
-                (s == CANCELLED) ? new CancellationException() :
+        int s = status;
+        return ((s & ABNORMAL) == 0 ? null :
+                (s & THROWN)   == 0 ? new CancellationException() :
                 getThrowableException());
     }
 
@@ -961,7 +973,7 @@
             setExceptionalCompletion(rex);
             return;
         }
-        setCompletion(NORMAL);
+        setDone();
     }
 
     /**
@@ -973,7 +985,7 @@
      * @since 1.8
      */
     public final void quietlyComplete() {
-        setCompletion(NORMAL);
+        setDone();
     }
 
     /**
@@ -990,11 +1002,12 @@
     public final V get() throws InterruptedException, ExecutionException {
         int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
             doJoin() : externalInterruptibleAwaitDone();
-        if ((s &= DONE_MASK) == CANCELLED)
+        if ((s & THROWN) != 0)
+            throw new ExecutionException(getThrowableException());
+        else if ((s & ABNORMAL) != 0)
             throw new CancellationException();
-        if (s == EXCEPTIONAL)
-            throw new ExecutionException(getThrowableException());
-        return getRawResult();
+        else
+            return getRawResult();
     }
 
     /**
@@ -1034,7 +1047,7 @@
                 while ((s = status) >= 0 &&
                        (ns = deadline - System.nanoTime()) > 0L) {
                     if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
-                        STATUS.compareAndSet(this, s, s | SIGNAL)) {
+                        (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
                         synchronized (this) {
                             if (status >= 0)
                                 wait(ms); // OK to throw InterruptedException
@@ -1046,15 +1059,13 @@
             }
         }
         if (s >= 0)
-            s = status;
-        if ((s &= DONE_MASK) != NORMAL) {
-            if (s == CANCELLED)
-                throw new CancellationException();
-            if (s != EXCEPTIONAL)
-                throw new TimeoutException();
+            throw new TimeoutException();
+        else if ((s & THROWN) != 0)
             throw new ExecutionException(getThrowableException());
-        }
-        return getRawResult();
+        else if ((s & ABNORMAL) != 0)
+            throw new CancellationException();
+        else
+            return getRawResult();
     }
 
     /**
@@ -1110,7 +1121,7 @@
      * setRawResult(null)}.
      */
     public void reinitialize() {
-        if ((status & DONE_MASK) == EXCEPTIONAL)
+        if ((status & THROWN) != 0)
             clearExceptionalCompletion();
         else
             status = 0;
@@ -1327,8 +1338,8 @@
      */
     public final short setForkJoinTaskTag(short newValue) {
         for (int s;;) {
-            if (STATUS.compareAndSet(this, s = status,
-                                     (s & ~SMASK) | (newValue & SMASK)))
+            if (STATUS.weakCompareAndSet(this, s = status,
+                                         (s & ~SMASK) | (newValue & SMASK)))
                 return (short)s;
         }
     }
@@ -1351,8 +1362,8 @@
         for (int s;;) {
             if ((short)(s = status) != expect)
                 return false;
-            if (STATUS.compareAndSet(this, s,
-                                     (s & ~SMASK) | (update & SMASK)))
+            if (STATUS.weakCompareAndSet(this, s,
+                                         (s & ~SMASK) | (update & SMASK)))
                 return true;
         }
     }
--- a/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java	Fri Dec 08 15:26:56 2017 -0800
+++ b/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java	Fri Dec 08 15:30:53 2017 -0800
@@ -105,8 +105,7 @@
     /**
      * A taken submitted task is completed
      */
-    public void testTake()
-        throws InterruptedException, ExecutionException {
+    public void testTake() throws Exception {
         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
         cs.submit(new StringTask());
         Future f = cs.take();
@@ -127,8 +126,7 @@
     /**
      * poll returns non-null when the returned task is completed
      */
-    public void testPoll1()
-        throws InterruptedException, ExecutionException {
+    public void testPoll1() throws Exception {
         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
         assertNull(cs.poll());
         cs.submit(new StringTask());
@@ -147,15 +145,15 @@
     /**
      * timed poll returns non-null when the returned task is completed
      */
-    public void testPoll2()
-        throws InterruptedException, ExecutionException {
+    public void testPoll2() throws Exception {
         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
         assertNull(cs.poll());
         cs.submit(new StringTask());
 
         long startTime = System.nanoTime();
         Future f;
-        while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
+        while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) {
+            assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
                 fail("timed out");
             Thread.yield();
@@ -167,8 +165,7 @@
     /**
      * poll returns null before the returned task is completed
      */
-    public void testPollReturnsNull()
-        throws InterruptedException, ExecutionException {
+    public void testPollReturnsNullBeforeCompletion() throws Exception {
         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
         final CountDownLatch proceed = new CountDownLatch(1);
         cs.submit(new Callable() { public String call() throws Exception {
@@ -188,29 +185,28 @@
     /**
      * successful and failed tasks are both returned
      */
-    public void testTaskAssortment()
-        throws InterruptedException, ExecutionException {
+    public void testTaskAssortment() throws Exception {
         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
         ArithmeticException ex = new ArithmeticException();
-        for (int i = 0; i < 2; i++) {
+        final int rounds = 2;
+        for (int i = rounds; i--> 0; ) {
             cs.submit(new StringTask());
             cs.submit(callableThrowing(ex));
             cs.submit(runnableThrowing(ex), null);
         }
         int normalCompletions = 0;
         int exceptionalCompletions = 0;
-        for (int i = 0; i < 3 * 2; i++) {
+        for (int i = 3 * rounds; i--> 0; ) {
             try {
-                if (cs.take().get() == TEST_STRING)
-                    normalCompletions++;
-            }
-            catch (ExecutionException expected) {
-                assertTrue(expected.getCause() instanceof ArithmeticException);
+                assertSame(TEST_STRING, cs.take().get());
+                normalCompletions++;
+            } catch (ExecutionException expected) {
+                assertSame(ex, expected.getCause());
                 exceptionalCompletions++;
             }
         }
-        assertEquals(2 * 1, normalCompletions);
-        assertEquals(2 * 2, exceptionalCompletions);
+        assertEquals(1 * rounds, normalCompletions);
+        assertEquals(2 * rounds, exceptionalCompletions);
         assertNull(cs.poll());
     }