jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
changeset 2430 975a98680ae8
parent 2165 98373487fcf4
child 5506 202f599c92aa
equal deleted inserted replaced
2429:8ea224e457c6 2430:975a98680ae8
   164         static final int CANCELLED =  1;
   164         static final int CANCELLED =  1;
   165         /** waitStatus value to indicate successor's thread needs unparking */
   165         /** waitStatus value to indicate successor's thread needs unparking */
   166         static final int SIGNAL    = -1;
   166         static final int SIGNAL    = -1;
   167         /** waitStatus value to indicate thread is waiting on condition */
   167         /** waitStatus value to indicate thread is waiting on condition */
   168         static final int CONDITION = -2;
   168         static final int CONDITION = -2;
       
   169         /**
       
   170          * waitStatus value to indicate the next acquireShared should
       
   171          * unconditionally propagate
       
   172          */
       
   173         static final int PROPAGATE = -3;
   169 
   174 
   170         /**
   175         /**
   171          * Status field, taking on only the values:
   176          * Status field, taking on only the values:
   172          *   SIGNAL:     The successor of this node is (or will soon be)
   177          *   SIGNAL:     The successor of this node is (or will soon be)
   173          *               blocked (via park), so the current node must
   178          *               blocked (via park), so the current node must
   178          *               on failure, block.
   183          *               on failure, block.
   179          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
   184          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
   180          *               Nodes never leave this state. In particular,
   185          *               Nodes never leave this state. In particular,
   181          *               a thread with cancelled node never again blocks.
   186          *               a thread with cancelled node never again blocks.
   182          *   CONDITION:  This node is currently on a condition queue.
   187          *   CONDITION:  This node is currently on a condition queue.
   183          *               It will not be used as a sync queue node until
   188          *               It will not be used as a sync queue node
   184          *               transferred. (Use of this value here
   189          *               until transferred, at which time the status
   185          *               has nothing to do with the other uses
   190          *               will be set to 0. (Use of this value here has
   186          *               of the field, but simplifies mechanics.)
   191          *               nothing to do with the other uses of the
       
   192          *               field, but simplifies mechanics.)
       
   193          *   PROPAGATE:  A releaseShared should be propagated to other
       
   194          *               nodes. This is set (for head node only) in
       
   195          *               doReleaseShared to ensure propagation
       
   196          *               continues, even if other operations have
       
   197          *               since intervened.
   187          *   0:          None of the above
   198          *   0:          None of the above
   188          *
   199          *
   189          * The values are arranged numerically to simplify use.
   200          * The values are arranged numerically to simplify use.
   190          * Non-negative values mean that a node doesn't need to
   201          * Non-negative values mean that a node doesn't need to
   191          * signal. So, most code doesn't need to check for particular
   202          * signal. So, most code doesn't need to check for particular
   401      *
   412      *
   402      * @param node the node
   413      * @param node the node
   403      */
   414      */
   404     private void unparkSuccessor(Node node) {
   415     private void unparkSuccessor(Node node) {
   405         /*
   416         /*
   406          * Try to clear status in anticipation of signalling.  It is
   417          * If status is negative (i.e., possibly needing signal) try
   407          * OK if this fails or if status is changed by waiting thread.
   418          * to clear in anticipation of signalling.  It is OK if this
   408          */
   419          * fails or if status is changed by waiting thread.
   409         compareAndSetWaitStatus(node, Node.SIGNAL, 0);
   420          */
       
   421         int ws = node.waitStatus;
       
   422         if (ws < 0)
       
   423             compareAndSetWaitStatus(node, ws, 0);
   410 
   424 
   411         /*
   425         /*
   412          * Thread to unpark is held in successor, which is normally
   426          * Thread to unpark is held in successor, which is normally
   413          * just the next node.  But if cancelled or apparently null,
   427          * just the next node.  But if cancelled or apparently null,
   414          * traverse backwards from tail to find the actual
   428          * traverse backwards from tail to find the actual
   424         if (s != null)
   438         if (s != null)
   425             LockSupport.unpark(s.thread);
   439             LockSupport.unpark(s.thread);
   426     }
   440     }
   427 
   441 
   428     /**
   442     /**
       
   443      * Release action for shared mode -- signal successor and ensure
       
   444      * propagation. (Note: For exclusive mode, release just amounts
       
   445      * to calling unparkSuccessor of head if it needs signal.)
       
   446      */
       
   447     private void doReleaseShared() {
       
   448         /*
       
   449          * Ensure that a release propagates, even if there are other
       
   450          * in-progress acquires/releases.  This proceeds in the usual
       
   451          * way of trying to unparkSuccessor of head if it needs
       
   452          * signal. But if it does not, status is set to PROPAGATE to
       
   453          * ensure that upon release, propagation continues.
       
   454          * Additionally, we must loop in case a new node is added
       
   455          * while we are doing this. Also, unlike other uses of
       
   456          * unparkSuccessor, we need to know if CAS to reset status
       
   457          * fails, if so rechecking.
       
   458          */
       
   459         for (;;) {
       
   460             Node h = head;
       
   461             if (h != null && h != tail) {
       
   462                 int ws = h.waitStatus;
       
   463                 if (ws == Node.SIGNAL) {
       
   464                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
       
   465                         continue;            // loop to recheck cases
       
   466                     unparkSuccessor(h);
       
   467                 }
       
   468                 else if (ws == 0 &&
       
   469                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
       
   470                     continue;                // loop on failed CAS
       
   471             }
       
   472             if (h == head)                   // loop if head changed
       
   473                 break;
       
   474         }
       
   475     }
       
   476 
       
   477     /**
   429      * Sets head of queue, and checks if successor may be waiting
   478      * Sets head of queue, and checks if successor may be waiting
   430      * in shared mode, if so propagating if propagate > 0.
   479      * in shared mode, if so propagating if either propagate > 0 or
   431      *
   480      * PROPAGATE status was set.
   432      * @param pred the node holding waitStatus for node
   481      *
   433      * @param node the node
   482      * @param node the node
   434      * @param propagate the return value from a tryAcquireShared
   483      * @param propagate the return value from a tryAcquireShared
   435      */
   484      */
   436     private void setHeadAndPropagate(Node node, long propagate) {
   485     private void setHeadAndPropagate(Node node, long propagate) {
       
   486         Node h = head; // Record old head for check below
   437         setHead(node);
   487         setHead(node);
   438         if (propagate > 0 && node.waitStatus != 0) {
   488         /*
   439             /*
   489          * Try to signal next queued node if:
   440              * Don't bother fully figuring out successor.  If it
   490          *   Propagation was indicated by caller,
   441              * looks null, call unparkSuccessor anyway to be safe.
   491          *     or was recorded (as h.waitStatus) by a previous operation
   442              */
   492          *     (note: this uses sign-check of waitStatus because
       
   493          *      PROPAGATE status may transition to SIGNAL.)
       
   494          * and
       
   495          *   The next node is waiting in shared mode,
       
   496          *     or we don't know, because it appears null
       
   497          *
       
   498          * The conservatism in both of these checks may cause
       
   499          * unnecessary wake-ups, but only when there are multiple
       
   500          * racing acquires/releases, so most need signals now or soon
       
   501          * anyway.
       
   502          */
       
   503         if (propagate > 0 || h == null || h.waitStatus < 0) {
   443             Node s = node.next;
   504             Node s = node.next;
   444             if (s == null || s.isShared())
   505             if (s == null || s.isShared())
   445                 unparkSuccessor(node);
   506                 doReleaseShared();
   446         }
   507         }
   447     }
   508     }
   448 
   509 
   449     // Utilities for various versions of acquire
   510     // Utilities for various versions of acquire
   450 
   511 
   463         // Skip cancelled predecessors
   524         // Skip cancelled predecessors
   464         Node pred = node.prev;
   525         Node pred = node.prev;
   465         while (pred.waitStatus > 0)
   526         while (pred.waitStatus > 0)
   466             node.prev = pred = pred.prev;
   527             node.prev = pred = pred.prev;
   467 
   528 
   468         // Getting this before setting waitStatus ensures staleness
   529         // predNext is the apparent node to unsplice. CASes below will
       
   530         // fail if not, in which case, we lost race vs another cancel
       
   531         // or signal, so no further action is necessary.
   469         Node predNext = pred.next;
   532         Node predNext = pred.next;
   470 
   533 
   471         // Can use unconditional write instead of CAS here
   534         // Can use unconditional write instead of CAS here.
       
   535         // After this atomic step, other Nodes can skip past us.
       
   536         // Before, we are free of interference from other threads.
   472         node.waitStatus = Node.CANCELLED;
   537         node.waitStatus = Node.CANCELLED;
   473 
   538 
   474         // If we are the tail, remove ourselves
   539         // If we are the tail, remove ourselves.
   475         if (node == tail && compareAndSetTail(node, pred)) {
   540         if (node == tail && compareAndSetTail(node, pred)) {
   476             compareAndSetNext(pred, predNext, null);
   541             compareAndSetNext(pred, predNext, null);
   477         } else {
   542         } else {
   478             // If "active" predecessor found...
   543             // If successor needs signal, try to set pred's next-link
   479             if (pred != head
   544             // so it will get one. Otherwise wake it up to propagate.
   480                 && (pred.waitStatus == Node.SIGNAL
   545             int ws;
   481                     || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
   546             if (pred != head &&
   482                 && pred.thread != null) {
   547                 ((ws = pred.waitStatus) == Node.SIGNAL ||
   483 
   548                  (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
   484                 // If successor is active, set predecessor's next link
   549                 pred.thread != null) {
   485                 Node next = node.next;
   550                 Node next = node.next;
   486                 if (next != null && next.waitStatus <= 0)
   551                 if (next != null && next.waitStatus <= 0)
   487                     compareAndSetNext(pred, predNext, next);
   552                     compareAndSetNext(pred, predNext, next);
   488             } else {
   553             } else {
   489                 unparkSuccessor(node);
   554                 unparkSuccessor(node);
   501      * @param pred node's predecessor holding status
   566      * @param pred node's predecessor holding status
   502      * @param node the node
   567      * @param node the node
   503      * @return {@code true} if thread should block
   568      * @return {@code true} if thread should block
   504      */
   569      */
   505     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   570     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   506         int s = pred.waitStatus;
   571         int ws = pred.waitStatus;
   507         if (s < 0)
   572         if (ws == Node.SIGNAL)
   508             /*
   573             /*
   509              * This node has already set status asking a release
   574              * This node has already set status asking a release
   510              * to signal it, so it can safely park.
   575              * to signal it, so it can safely park.
   511              */
   576              */
   512             return true;
   577             return true;
   513         if (s > 0) {
   578         if (ws > 0) {
   514             /*
   579             /*
   515              * Predecessor was cancelled. Skip over predecessors and
   580              * Predecessor was cancelled. Skip over predecessors and
   516              * indicate retry.
   581              * indicate retry.
   517              */
   582              */
   518             do {
   583             do {
   519                 node.prev = pred = pred.prev;
   584                 node.prev = pred = pred.prev;
   520             } while (pred.waitStatus > 0);
   585             } while (pred.waitStatus > 0);
   521             pred.next = node;
   586             pred.next = node;
   522         }
   587         } else {
   523         else
       
   524             /*
   588             /*
   525              * Indicate that we need a signal, but don't park yet. Caller
   589              * waitStatus must be 0 or PROPAGATE.  Indicate that we
   526              * will need to retry to make sure it cannot acquire before
   590              * need a signal, but don't park yet.  Caller will need to
   527              * parking.
   591              * retry to make sure it cannot acquire before parking.
   528              */
   592              */
   529             compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
   593             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       
   594         }
   530         return false;
   595         return false;
   531     }
   596     }
   532 
   597 
   533     /**
   598     /**
   534      * Convenience method to interrupt current thread.
   599      * Convenience method to interrupt current thread.
  1044      *        and can represent anything you like.
  1109      *        and can represent anything you like.
  1045      * @return the value returned from {@link #tryReleaseShared}
  1110      * @return the value returned from {@link #tryReleaseShared}
  1046      */
  1111      */
  1047     public final boolean releaseShared(long arg) {
  1112     public final boolean releaseShared(long arg) {
  1048         if (tryReleaseShared(arg)) {
  1113         if (tryReleaseShared(arg)) {
  1049             Node h = head;
  1114             doReleaseShared();
  1050             if (h != null && h.waitStatus != 0)
       
  1051                 unparkSuccessor(h);
       
  1052             return true;
  1115             return true;
  1053         }
  1116         }
  1054         return false;
  1117         return false;
  1055     }
  1118     }
  1056 
  1119 
  1388          * indicate that thread is (probably) waiting. If cancelled or
  1451          * indicate that thread is (probably) waiting. If cancelled or
  1389          * attempt to set waitStatus fails, wake up to resync (in which
  1452          * attempt to set waitStatus fails, wake up to resync (in which
  1390          * case the waitStatus can be transiently and harmlessly wrong).
  1453          * case the waitStatus can be transiently and harmlessly wrong).
  1391          */
  1454          */
  1392         Node p = enq(node);
  1455         Node p = enq(node);
  1393         int c = p.waitStatus;
  1456         int ws = p.waitStatus;
  1394         if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
  1457         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  1395             LockSupport.unpark(node.thread);
  1458             LockSupport.unpark(node.thread);
  1396         return true;
  1459         return true;
  1397     }
  1460     }
  1398 
  1461 
  1399     /**
  1462     /**