jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
changeset 2430 975a98680ae8
parent 2165 98373487fcf4
child 5506 202f599c92aa
--- a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Wed Mar 25 12:24:30 2009 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Thu Mar 26 11:59:07 2009 -0700
@@ -166,6 +166,11 @@
         static final int SIGNAL    = -1;
         /** waitStatus value to indicate thread is waiting on condition */
         static final int CONDITION = -2;
+        /**
+         * waitStatus value to indicate the next acquireShared should
+         * unconditionally propagate
+         */
+        static final int PROPAGATE = -3;
 
         /**
          * Status field, taking on only the values:
@@ -180,10 +185,16 @@
          *               Nodes never leave this state. In particular,
          *               a thread with cancelled node never again blocks.
          *   CONDITION:  This node is currently on a condition queue.
-         *               It will not be used as a sync queue node until
-         *               transferred. (Use of this value here
-         *               has nothing to do with the other uses
-         *               of the field, but simplifies mechanics.)
+         *               It will not be used as a sync queue node
+         *               until transferred, at which time the status
+         *               will be set to 0. (Use of this value here has
+         *               nothing to do with the other uses of the
+         *               field, but simplifies mechanics.)
+         *   PROPAGATE:  A releaseShared should be propagated to other
+         *               nodes. This is set (for head node only) in
+         *               doReleaseShared to ensure propagation
+         *               continues, even if other operations have
+         *               since intervened.
          *   0:          None of the above
          *
          * The values are arranged numerically to simplify use.
@@ -403,10 +414,13 @@
      */
     private void unparkSuccessor(Node node) {
         /*
-         * Try to clear status in anticipation of signalling.  It is
-         * OK if this fails or if status is changed by waiting thread.
+         * If status is negative (i.e., possibly needing signal) try
+         * to clear in anticipation of signalling.  It is OK if this
+         * fails or if status is changed by waiting thread.
          */
-        compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+        int ws = node.waitStatus;
+        if (ws < 0)
+            compareAndSetWaitStatus(node, ws, 0);
 
         /*
          * Thread to unpark is held in successor, which is normally
@@ -426,23 +440,70 @@
     }
 
     /**
+     * Release action for shared mode -- signal successor and ensure
+     * propagation. (Note: For exclusive mode, release just amounts
+     * to calling unparkSuccessor of head if it needs signal.)
+     */
+    private void doReleaseShared() {
+        /*
+         * Ensure that a release propagates, even if there are other
+         * in-progress acquires/releases.  This proceeds in the usual
+         * way of trying to unparkSuccessor of head if it needs
+         * signal. But if it does not, status is set to PROPAGATE to
+         * ensure that upon release, propagation continues.
+         * Additionally, we must loop in case a new node is added
+         * while we are doing this. Also, unlike other uses of
+         * unparkSuccessor, we need to know if CAS to reset status
+         * fails, if so rechecking.
+         */
+        for (;;) {
+            Node h = head;
+            if (h != null && h != tail) {
+                int ws = h.waitStatus;
+                if (ws == Node.SIGNAL) {
+                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+                        continue;            // loop to recheck cases
+                    unparkSuccessor(h);
+                }
+                else if (ws == 0 &&
+                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+                    continue;                // loop on failed CAS
+            }
+            if (h == head)                   // loop if head changed
+                break;
+        }
+    }
+
+    /**
      * Sets head of queue, and checks if successor may be waiting
-     * in shared mode, if so propagating if propagate > 0.
+     * in shared mode, if so propagating if either propagate > 0 or
+     * PROPAGATE status was set.
      *
-     * @param pred the node holding waitStatus for node
      * @param node the node
      * @param propagate the return value from a tryAcquireShared
      */
     private void setHeadAndPropagate(Node node, long propagate) {
+        Node h = head; // Record old head for check below
         setHead(node);
-        if (propagate > 0 && node.waitStatus != 0) {
-            /*
-             * Don't bother fully figuring out successor.  If it
-             * looks null, call unparkSuccessor anyway to be safe.
-             */
+        /*
+         * Try to signal next queued node if:
+         *   Propagation was indicated by caller,
+         *     or was recorded (as h.waitStatus) by a previous operation
+         *     (note: this uses sign-check of waitStatus because
+         *      PROPAGATE status may transition to SIGNAL.)
+         * and
+         *   The next node is waiting in shared mode,
+         *     or we don't know, because it appears null
+         *
+         * The conservatism in both of these checks may cause
+         * unnecessary wake-ups, but only when there are multiple
+         * racing acquires/releases, so most need signals now or soon
+         * anyway.
+         */
+        if (propagate > 0 || h == null || h.waitStatus < 0) {
             Node s = node.next;
             if (s == null || s.isShared())
-                unparkSuccessor(node);
+                doReleaseShared();
         }
     }
 
@@ -465,23 +526,27 @@
         while (pred.waitStatus > 0)
             node.prev = pred = pred.prev;
 
-        // Getting this before setting waitStatus ensures staleness
+        // predNext is the apparent node to unsplice. CASes below will
+        // fail if not, in which case, we lost race vs another cancel
+        // or signal, so no further action is necessary.
         Node predNext = pred.next;
 
-        // Can use unconditional write instead of CAS here
+        // Can use unconditional write instead of CAS here.
+        // After this atomic step, other Nodes can skip past us.
+        // Before, we are free of interference from other threads.
         node.waitStatus = Node.CANCELLED;
 
-        // If we are the tail, remove ourselves
+        // If we are the tail, remove ourselves.
         if (node == tail && compareAndSetTail(node, pred)) {
             compareAndSetNext(pred, predNext, null);
         } else {
-            // If "active" predecessor found...
-            if (pred != head
-                && (pred.waitStatus == Node.SIGNAL
-                    || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
-                && pred.thread != null) {
-
-                // If successor is active, set predecessor's next link
+            // If successor needs signal, try to set pred's next-link
+            // so it will get one. Otherwise wake it up to propagate.
+            int ws;
+            if (pred != head &&
+                ((ws = pred.waitStatus) == Node.SIGNAL ||
+                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+                pred.thread != null) {
                 Node next = node.next;
                 if (next != null && next.waitStatus <= 0)
                     compareAndSetNext(pred, predNext, next);
@@ -503,14 +568,14 @@
      * @return {@code true} if thread should block
      */
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
-        int s = pred.waitStatus;
-        if (s < 0)
+        int ws = pred.waitStatus;
+        if (ws == Node.SIGNAL)
             /*
              * This node has already set status asking a release
              * to signal it, so it can safely park.
              */
             return true;
-        if (s > 0) {
+        if (ws > 0) {
             /*
              * Predecessor was cancelled. Skip over predecessors and
              * indicate retry.
@@ -519,14 +584,14 @@
                 node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
             pred.next = node;
-        }
-        else
+        } else {
             /*
-             * Indicate that we need a signal, but don't park yet. Caller
-             * will need to retry to make sure it cannot acquire before
-             * parking.
+             * waitStatus must be 0 or PROPAGATE.  Indicate that we
+             * need a signal, but don't park yet.  Caller will need to
+             * retry to make sure it cannot acquire before parking.
              */
-            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
+            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
+        }
         return false;
     }
 
@@ -1046,9 +1111,7 @@
      */
     public final boolean releaseShared(long arg) {
         if (tryReleaseShared(arg)) {
-            Node h = head;
-            if (h != null && h.waitStatus != 0)
-                unparkSuccessor(h);
+            doReleaseShared();
             return true;
         }
         return false;
@@ -1390,8 +1453,8 @@
          * case the waitStatus can be transiently and harmlessly wrong).
          */
         Node p = enq(node);
-        int c = p.waitStatus;
-        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+        int ws = p.waitStatus;
+        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
             LockSupport.unpark(node.thread);
         return true;
     }