src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
branchdatagramsocketimpl-branch
changeset 58678 9cf78a70fa4f
parent 49565 b5705ade8c8d
child 58679 9c3209ff7550
equal deleted inserted replaced
58677:13588c901957 58678:9cf78a70fa4f
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    34  */
    35 
    35 
    36 package java.util.concurrent.locks;
    36 package java.util.concurrent.locks;
    37 
    37 
    38 import java.lang.invoke.MethodHandles;
       
    39 import java.lang.invoke.VarHandle;
       
    40 import java.util.ArrayList;
    38 import java.util.ArrayList;
    41 import java.util.Collection;
    39 import java.util.Collection;
    42 import java.util.Date;
    40 import java.util.Date;
    43 import java.util.concurrent.TimeUnit;
    41 import java.util.concurrent.TimeUnit;
    44 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
    42 import java.util.concurrent.ForkJoinPool;
       
    43 import jdk.internal.misc.Unsafe;
    45 
    44 
    46 /**
    45 /**
    47  * A version of {@link AbstractQueuedSynchronizer} in
    46  * A version of {@link AbstractQueuedSynchronizer} in
    48  * which synchronization state is maintained as a {@code long}.
    47  * which synchronization state is maintained as a {@code long}.
    49  * This class has exactly the same structure, properties, and methods
    48  * This class has exactly the same structure, properties, and methods
    71      * exactly cloned from AbstractQueuedSynchronizer, replacing class
    70      * exactly cloned from AbstractQueuedSynchronizer, replacing class
    72      * name and changing ints related with sync state to longs. Please
    71      * name and changing ints related with sync state to longs. Please
    73      * keep it that way.
    72      * keep it that way.
    74      */
    73      */
    75 
    74 
    76     /**
    75     // Node status bits, also used as argument and return values
    77      * Creates a new {@code AbstractQueuedLongSynchronizer} instance
    76     static final int WAITING   = 1;          // must be 1
    78      * with initial synchronization state of zero.
    77     static final int CANCELLED = 0x80000000; // must be negative
    79      */
    78     static final int COND      = 2;          // in a condition wait
    80     protected AbstractQueuedLongSynchronizer() { }
    79 
    81 
    80     /** CLH Nodes */
    82     /**
    81     abstract static class Node {
    83      * Head of the wait queue, lazily initialized.  Except for
    82         volatile Node prev;       // initially attached via casTail
    84      * initialization, it is modified only via method setHead.  Note:
    83         volatile Node next;       // visibly nonnull when signallable
    85      * If head exists, its waitStatus is guaranteed not to be
    84         Thread waiter;            // visibly nonnull when enqueued
    86      * CANCELLED.
    85         volatile int status;      // written by owner, atomic bit ops by others
       
    86 
       
    87         // methods for atomic operations
       
    88         final boolean casPrev(Node c, Node v) {  // for cleanQueue
       
    89             return U.weakCompareAndSetReference(this, PREV, c, v);
       
    90         }
       
    91         final boolean casNext(Node c, Node v) {  // for cleanQueue
       
    92             return U.weakCompareAndSetReference(this, NEXT, c, v);
       
    93         }
       
    94         final int getAndUnsetStatus(int v) {     // for signalling
       
    95             return U.getAndBitwiseAndInt(this, STATUS, ~v);
       
    96         }
       
    97         final void setPrevRelaxed(Node p) {      // for off-queue assignment
       
    98             U.putReference(this, PREV, p);
       
    99         }
       
   100         final void setStatusRelaxed(int s) {     // for off-queue assignment
       
   101             U.putInt(this, STATUS, s);
       
   102         }
       
   103         final void clearStatus() {               // for reducing unneeded signals
       
   104             U.putIntOpaque(this, STATUS, 0);
       
   105         }
       
   106 
       
   107         private static final long STATUS
       
   108             = U.objectFieldOffset(Node.class, "status");
       
   109         private static final long NEXT
       
   110             = U.objectFieldOffset(Node.class, "next");
       
   111         private static final long PREV
       
   112             = U.objectFieldOffset(Node.class, "prev");
       
   113     }
       
   114 
       
   115     // Concrete classes tagged by type
       
   116     static final class ExclusiveNode extends Node { }
       
   117     static final class SharedNode extends Node { }
       
   118 
       
   119     static final class ConditionNode extends Node
       
   120         implements ForkJoinPool.ManagedBlocker {
       
   121         ConditionNode nextWaiter;            // link to next waiting node
       
   122 
       
   123         /**
       
   124          * Allows Conditions to be used in ForkJoinPools without
       
   125          * risking fixed pool exhaustion. This is usable only for
       
   126          * untimed Condition waits, not timed versions.
       
   127          */
       
   128         public final boolean isReleasable() {
       
   129             return status <= 1 || Thread.currentThread().isInterrupted();
       
   130         }
       
   131 
       
   132         public final boolean block() {
       
   133             while (!isReleasable()) LockSupport.park();
       
   134             return true;
       
   135         }
       
   136     }
       
   137 
       
   138     /**
       
   139      * Head of the wait queue, lazily initialized.
    87      */
   140      */
    88     private transient volatile Node head;
   141     private transient volatile Node head;
    89 
   142 
    90     /**
   143     /**
    91      * Tail of the wait queue, lazily initialized.  Modified only via
   144      * Tail of the wait queue. After initialization, modified only via casTail.
    92      * method enq to add new wait node.
       
    93      */
   145      */
    94     private transient volatile Node tail;
   146     private transient volatile Node tail;
    95 
   147 
    96     /**
   148     /**
    97      * The synchronization state.
   149      * The synchronization state.
   111      * Sets the value of synchronization state.
   163      * Sets the value of synchronization state.
   112      * This operation has memory semantics of a {@code volatile} write.
   164      * This operation has memory semantics of a {@code volatile} write.
   113      * @param newState the new state value
   165      * @param newState the new state value
   114      */
   166      */
   115     protected final void setState(long newState) {
   167     protected final void setState(long newState) {
   116         // See JDK-8180620: Clarify VarHandle mixed-access subtleties
   168         state = newState;
   117         STATE.setVolatile(this, newState);
       
   118     }
   169     }
   119 
   170 
   120     /**
   171     /**
   121      * Atomically sets synchronization state to the given updated
   172      * Atomically sets synchronization state to the given updated
   122      * value if the current state value equals the expected value.
   173      * value if the current state value equals the expected value.
   127      * @param update the new value
   178      * @param update the new value
   128      * @return {@code true} if successful. False return indicates that the actual
   179      * @return {@code true} if successful. False return indicates that the actual
   129      *         value was not equal to the expected value.
   180      *         value was not equal to the expected value.
   130      */
   181      */
   131     protected final boolean compareAndSetState(long expect, long update) {
   182     protected final boolean compareAndSetState(long expect, long update) {
   132         return STATE.compareAndSet(this, expect, update);
   183         return U.compareAndSetLong(this, STATE, expect, update);
   133     }
   184     }
   134 
   185 
   135     // Queuing utilities
   186     // Queuing utilities
   136 
   187 
   137     /**
   188     private boolean casTail(Node c, Node v) {
   138      * The number of nanoseconds for which it is faster to spin
   189         return U.compareAndSetReference(this, TAIL, c, v);
   139      * rather than to use timed park. A rough estimate suffices
   190     }
   140      * to improve responsiveness with very short timeouts.
   191 
   141      */
   192     /** tries once to CAS a new dummy node for head */
   142     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
   193     private void tryInitializeHead() {
   143 
   194         Node h = new ExclusiveNode();
   144     /**
   195         if (U.compareAndSetReference(this, HEAD, null, h))
   145      * Inserts node into queue, initializing if necessary. See picture above.
   196             tail = h;
   146      * @param node the node to insert
   197     }
   147      * @return node's predecessor
   198 
   148      */
   199     /**
   149     private Node enq(Node node) {
   200      * Enqueues the node unless null. (Currently used only for
       
   201      * ConditionNodes; other cases are interleaved with acquires.)
       
   202      */
       
   203     final void enqueue(Node node) {
       
   204         if (node != null) {
       
   205             for (;;) {
       
   206                 Node t = tail;
       
   207                 node.setPrevRelaxed(t);        // avoid unnecessary fence
       
   208                 if (t == null)                 // initialize
       
   209                     tryInitializeHead();
       
   210                 else if (casTail(t, node)) {
       
   211                     t.next = node;
       
   212                     if (t.status < 0)          // wake up to clean link
       
   213                         LockSupport.unpark(node.waiter);
       
   214                     break;
       
   215                 }
       
   216             }
       
   217         }
       
   218     }
       
   219 
       
   220     /** Returns true if node is found in traversal from tail */
       
   221     final boolean isEnqueued(Node node) {
       
   222         for (Node t = tail; t != null; t = t.prev)
       
   223             if (t == node)
       
   224                 return true;
       
   225         return false;
       
   226     }
       
   227 
       
   228     /**
       
   229      * Wakes up the successor of given node, if one exists, and unsets its
       
   230      * WAITING status to avoid park race. This may fail to wake up an
       
   231      * eligible thread when one or more have been cancelled, but
       
   232      * cancelAcquire ensures liveness.
       
   233      */
       
   234     private static void signalNext(Node h) {
       
   235         Node s;
       
   236         if (h != null && (s = h.next) != null && s.status != 0) {
       
   237             s.getAndUnsetStatus(WAITING);
       
   238             LockSupport.unpark(s.waiter);
       
   239         }
       
   240     }
       
   241 
       
   242     /** Wakes up the given node if in shared mode */
       
   243     private static void signalNextIfShared(Node h) {
       
   244         Node s;
       
   245         if (h != null && (s = h.next) != null &&
       
   246             (s instanceof SharedNode) && s.status != 0) {
       
   247             s.getAndUnsetStatus(WAITING);
       
   248             LockSupport.unpark(s.waiter);
       
   249         }
       
   250     }
       
   251 
       
   252     /**
       
   253      * Main acquire method, invoked by all exported acquire methods.
       
   254      *
       
   255      * @param node null unless a reacquiring Condition
       
   256      * @param arg the acquire argument
       
   257      * @param shared true if shared mode else exclusive
       
   258      * @param interruptible if abort and return negative on interrupt
       
   259      * @param timed if true use timed waits
       
   260      * @param time if timed, the System.nanoTime value to timeout
       
   261      * @return positive if acquired, 0 if timed out, negative if interrupted
       
   262      */
       
   263     final int acquire(Node node, long arg, boolean shared,
       
   264                       boolean interruptible, boolean timed, long time) {
       
   265         Thread current = Thread.currentThread();
       
   266         byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
       
   267         boolean interrupted = false, first = false;
       
   268         Node pred = null;                // predecessor of node when enqueued
       
   269 
       
   270         /*
       
   271          * Repeatedly:
       
   272          *  Check if node now first
       
   273          *    if so, ensure head stable, else ensure valid predecessor
       
   274          *  if node is first or not yet enqueued, try acquiring
       
   275          *  else if node not yet created, create it
       
   276          *  else if not yet enqueued, try once to enqueue
       
   277          *  else if woken from park, retry (up to postSpins times)
       
   278          *  else if WAITING status not set, set and retry
       
   279          *  else park and clear WAITING status, and check cancellation
       
   280          */
       
   281 
   150         for (;;) {
   282         for (;;) {
   151             Node oldTail = tail;
   283             if (!first && (pred = (node == null) ? null : node.prev) != null &&
   152             if (oldTail != null) {
   284                 !(first = (head == pred))) {
   153                 node.setPrevRelaxed(oldTail);
   285                 if (pred.status < 0) {
   154                 if (compareAndSetTail(oldTail, node)) {
   286                     cleanQueue();           // predecessor cancelled
   155                     oldTail.next = node;
   287                     continue;
   156                     return oldTail;
   288                 } else if (pred.prev == null) {
       
   289                     Thread.onSpinWait();    // ensure serialization
       
   290                     continue;
   157                 }
   291                 }
       
   292             }
       
   293             if (first || pred == null) {
       
   294                 boolean acquired;
       
   295                 try {
       
   296                     if (shared)
       
   297                         acquired = (tryAcquireShared(arg) >= 0);
       
   298                     else
       
   299                         acquired = tryAcquire(arg);
       
   300                 } catch (Throwable ex) {
       
   301                     cancelAcquire(node, interrupted, false);
       
   302                     throw ex;
       
   303                 }
       
   304                 if (acquired) {
       
   305                     if (first) {
       
   306                         node.prev = null;
       
   307                         head = node;
       
   308                         pred.next = null;
       
   309                         node.waiter = null;
       
   310                         if (shared)
       
   311                             signalNextIfShared(node);
       
   312                         if (interrupted)
       
   313                             current.interrupt();
       
   314                     }
       
   315                     return 1;
       
   316                 }
       
   317             }
       
   318             if (node == null) {                 // allocate; retry before enqueue
       
   319                 if (shared)
       
   320                     node = new SharedNode();
       
   321                 else
       
   322                     node = new ExclusiveNode();
       
   323             } else if (pred == null) {          // try to enqueue
       
   324                 node.waiter = current;
       
   325                 Node t = tail;
       
   326                 node.setPrevRelaxed(t);         // avoid unnecessary fence
       
   327                 if (t == null)
       
   328                     tryInitializeHead();
       
   329                 else if (!casTail(t, node))
       
   330                     node.setPrevRelaxed(null);  // back out
       
   331                 else
       
   332                     t.next = node;
       
   333             } else if (first && spins != 0) {
       
   334                 --spins;                        // reduce unfairness on rewaits
       
   335                 Thread.onSpinWait();
       
   336             } else if (node.status == 0) {
       
   337                 node.status = WAITING;          // enable signal and recheck
   158             } else {
   338             } else {
   159                 initializeSyncQueue();
   339                 long nanos;
   160             }
   340                 spins = postSpins = (byte)((postSpins << 1) | 1);
   161         }
   341                 if (!timed)
   162     }
   342                     LockSupport.park(this);
   163 
   343                 else if ((nanos = time - System.nanoTime()) > 0L)
   164     /**
   344                     LockSupport.parkNanos(this, nanos);
   165      * Creates and enqueues node for current thread and given mode.
   345                 else
   166      *
   346                     break;
   167      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
   347                 node.clearStatus();
   168      * @return the new node
   348                 if ((interrupted |= Thread.interrupted()) && interruptible)
   169      */
   349                     break;
   170     private Node addWaiter(Node mode) {
   350             }
   171         Node node = new Node(mode);
   351         }
   172 
   352         return cancelAcquire(node, interrupted, interruptible);
   173         for (;;) {
   353     }
   174             Node oldTail = tail;
   354 
   175             if (oldTail != null) {
   355     /**
   176                 node.setPrevRelaxed(oldTail);
   356      * Possibly repeatedly traverses from tail, unsplicing cancelled
   177                 if (compareAndSetTail(oldTail, node)) {
   357      * nodes until none are found.
   178                     oldTail.next = node;
   358      */
   179                     return node;
   359     private void cleanQueue() {
       
   360         for (;;) {                               // restart point
       
   361             for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
       
   362                 if (q == null || (p = q.prev) == null)
       
   363                     return;                      // end of list
       
   364                 if (s == null ? tail != q : (s.prev != q || s.status < 0))
       
   365                     break;                       // inconsistent
       
   366                 if (q.status < 0) {              // cancelled
       
   367                     if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
       
   368                         q.prev == p) {
       
   369                         p.casNext(q, s);         // OK if fails
       
   370                         if (p.prev == null)
       
   371                             signalNext(p);
       
   372                     }
       
   373                     break;
   180                 }
   374                 }
   181             } else {
   375                 if ((n = p.next) != q) {         // help finish
   182                 initializeSyncQueue();
   376                     if (n != null && q.prev == p) {
   183             }
   377                         p.casNext(n, q);
   184         }
   378                         if (p.prev == null)
   185     }
   379                             signalNext(p);
   186 
   380                     }
   187     /**
   381                     break;
   188      * Sets head of queue to be node, thus dequeuing. Called only by
       
   189      * acquire methods.  Also nulls out unused fields for sake of GC
       
   190      * and to suppress unnecessary signals and traversals.
       
   191      *
       
   192      * @param node the node
       
   193      */
       
   194     private void setHead(Node node) {
       
   195         head = node;
       
   196         node.thread = null;
       
   197         node.prev = null;
       
   198     }
       
   199 
       
   200     /**
       
   201      * Wakes up node's successor, if one exists.
       
   202      *
       
   203      * @param node the node
       
   204      */
       
   205     private void unparkSuccessor(Node node) {
       
   206         /*
       
   207          * If status is negative (i.e., possibly needing signal) try
       
   208          * to clear in anticipation of signalling.  It is OK if this
       
   209          * fails or if status is changed by waiting thread.
       
   210          */
       
   211         int ws = node.waitStatus;
       
   212         if (ws < 0)
       
   213             node.compareAndSetWaitStatus(ws, 0);
       
   214 
       
   215         /*
       
   216          * Thread to unpark is held in successor, which is normally
       
   217          * just the next node.  But if cancelled or apparently null,
       
   218          * traverse backwards from tail to find the actual
       
   219          * non-cancelled successor.
       
   220          */
       
   221         Node s = node.next;
       
   222         if (s == null || s.waitStatus > 0) {
       
   223             s = null;
       
   224             for (Node p = tail; p != node && p != null; p = p.prev)
       
   225                 if (p.waitStatus <= 0)
       
   226                     s = p;
       
   227         }
       
   228         if (s != null)
       
   229             LockSupport.unpark(s.thread);
       
   230     }
       
   231 
       
   232     /**
       
   233      * Release action for shared mode -- signals successor and ensures
       
   234      * propagation. (Note: For exclusive mode, release just amounts
       
   235      * to calling unparkSuccessor of head if it needs signal.)
       
   236      */
       
   237     private void doReleaseShared() {
       
   238         /*
       
   239          * Ensure that a release propagates, even if there are other
       
   240          * in-progress acquires/releases.  This proceeds in the usual
       
   241          * way of trying to unparkSuccessor of head if it needs
       
   242          * signal. But if it does not, status is set to PROPAGATE to
       
   243          * ensure that upon release, propagation continues.
       
   244          * Additionally, we must loop in case a new node is added
       
   245          * while we are doing this. Also, unlike other uses of
       
   246          * unparkSuccessor, we need to know if CAS to reset status
       
   247          * fails, if so rechecking.
       
   248          */
       
   249         for (;;) {
       
   250             Node h = head;
       
   251             if (h != null && h != tail) {
       
   252                 int ws = h.waitStatus;
       
   253                 if (ws == Node.SIGNAL) {
       
   254                     if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
       
   255                         continue;            // loop to recheck cases
       
   256                     unparkSuccessor(h);
       
   257                 }
   382                 }
   258                 else if (ws == 0 &&
   383                 s = q;
   259                          !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
   384                 q = q.prev;
   260                     continue;                // loop on failed CAS
   385             }
   261             }
   386         }
   262             if (h == head)                   // loop if head changed
   387     }
   263                 break;
       
   264         }
       
   265     }
       
   266 
       
   267     /**
       
   268      * Sets head of queue, and checks if successor may be waiting
       
   269      * in shared mode, if so propagating if either propagate > 0 or
       
   270      * PROPAGATE status was set.
       
   271      *
       
   272      * @param node the node
       
   273      * @param propagate the return value from a tryAcquireShared
       
   274      */
       
   275     private void setHeadAndPropagate(Node node, long propagate) {
       
   276         Node h = head; // Record old head for check below
       
   277         setHead(node);
       
   278         /*
       
   279          * Try to signal next queued node if:
       
   280          *   Propagation was indicated by caller,
       
   281          *     or was recorded (as h.waitStatus either before
       
   282          *     or after setHead) by a previous operation
       
   283          *     (note: this uses sign-check of waitStatus because
       
   284          *      PROPAGATE status may transition to SIGNAL.)
       
   285          * and
       
   286          *   The next node is waiting in shared mode,
       
   287          *     or we don't know, because it appears null
       
   288          *
       
   289          * The conservatism in both of these checks may cause
       
   290          * unnecessary wake-ups, but only when there are multiple
       
   291          * racing acquires/releases, so most need signals now or soon
       
   292          * anyway.
       
   293          */
       
   294         if (propagate > 0 || h == null || h.waitStatus < 0 ||
       
   295             (h = head) == null || h.waitStatus < 0) {
       
   296             Node s = node.next;
       
   297             if (s == null || s.isShared())
       
   298                 doReleaseShared();
       
   299         }
       
   300     }
       
   301 
       
   302     // Utilities for various versions of acquire
       
   303 
   388 
   304     /**
   389     /**
   305      * Cancels an ongoing attempt to acquire.
   390      * Cancels an ongoing attempt to acquire.
   306      *
   391      *
   307      * @param node the node
   392      * @param node the node (may be null if cancelled before enqueuing)
   308      */
   393      * @param interrupted true if thread interrupted
   309     private void cancelAcquire(Node node) {
   394      * @param interruptible if should report interruption vs reset
   310         // Ignore if node doesn't exist
   395      */
   311         if (node == null)
   396     private int cancelAcquire(Node node, boolean interrupted,
   312             return;
   397                               boolean interruptible) {
   313 
   398         if (node != null) {
   314         node.thread = null;
   399             node.waiter = null;
   315 
   400             node.status = CANCELLED;
   316         // Skip cancelled predecessors
   401             if (node.prev != null)
   317         Node pred = node.prev;
   402                 cleanQueue();
   318         while (pred.waitStatus > 0)
   403         }
   319             node.prev = pred = pred.prev;
   404         if (interrupted) {
   320 
   405             if (interruptible)
   321         // predNext is the apparent node to unsplice. CASes below will
   406                 return CANCELLED;
   322         // fail if not, in which case, we lost race vs another cancel
   407             else
   323         // or signal, so no further action is necessary, although with
   408                 Thread.currentThread().interrupt();
   324         // a possibility that a cancelled node may transiently remain
   409         }
   325         // reachable.
   410         return 0;
   326         Node predNext = pred.next;
       
   327 
       
   328         // Can use unconditional write instead of CAS here.
       
   329         // After this atomic step, other Nodes can skip past us.
       
   330         // Before, we are free of interference from other threads.
       
   331         node.waitStatus = Node.CANCELLED;
       
   332 
       
   333         // If we are the tail, remove ourselves.
       
   334         if (node == tail && compareAndSetTail(node, pred)) {
       
   335             pred.compareAndSetNext(predNext, null);
       
   336         } else {
       
   337             // If successor needs signal, try to set pred's next-link
       
   338             // so it will get one. Otherwise wake it up to propagate.
       
   339             int ws;
       
   340             if (pred != head &&
       
   341                 ((ws = pred.waitStatus) == Node.SIGNAL ||
       
   342                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
       
   343                 pred.thread != null) {
       
   344                 Node next = node.next;
       
   345                 if (next != null && next.waitStatus <= 0)
       
   346                     pred.compareAndSetNext(predNext, next);
       
   347             } else {
       
   348                 unparkSuccessor(node);
       
   349             }
       
   350 
       
   351             node.next = node; // help GC
       
   352         }
       
   353     }
       
   354 
       
   355     /**
       
   356      * Checks and updates status for a node that failed to acquire.
       
   357      * Returns true if thread should block. This is the main signal
       
   358      * control in all acquire loops.  Requires that pred == node.prev.
       
   359      *
       
   360      * @param pred node's predecessor holding status
       
   361      * @param node the node
       
   362      * @return {@code true} if thread should block
       
   363      */
       
   364     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       
   365         int ws = pred.waitStatus;
       
   366         if (ws == Node.SIGNAL)
       
   367             /*
       
   368              * This node has already set status asking a release
       
   369              * to signal it, so it can safely park.
       
   370              */
       
   371             return true;
       
   372         if (ws > 0) {
       
   373             /*
       
   374              * Predecessor was cancelled. Skip over predecessors and
       
   375              * indicate retry.
       
   376              */
       
   377             do {
       
   378                 node.prev = pred = pred.prev;
       
   379             } while (pred.waitStatus > 0);
       
   380             pred.next = node;
       
   381         } else {
       
   382             /*
       
   383              * waitStatus must be 0 or PROPAGATE.  Indicate that we
       
   384              * need a signal, but don't park yet.  Caller will need to
       
   385              * retry to make sure it cannot acquire before parking.
       
   386              */
       
   387             pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
       
   388         }
       
   389         return false;
       
   390     }
       
   391 
       
   392     /**
       
   393      * Convenience method to interrupt current thread.
       
   394      */
       
   395     static void selfInterrupt() {
       
   396         Thread.currentThread().interrupt();
       
   397     }
       
   398 
       
   399     /**
       
   400      * Convenience method to park and then check if interrupted.
       
   401      *
       
   402      * @return {@code true} if interrupted
       
   403      */
       
   404     private final boolean parkAndCheckInterrupt() {
       
   405         LockSupport.park(this);
       
   406         return Thread.interrupted();
       
   407     }
       
   408 
       
   409     /*
       
   410      * Various flavors of acquire, varying in exclusive/shared and
       
   411      * control modes.  Each is mostly the same, but annoyingly
       
   412      * different.  Only a little bit of factoring is possible due to
       
   413      * interactions of exception mechanics (including ensuring that we
       
   414      * cancel if tryAcquire throws exception) and other control, at
       
   415      * least not without hurting performance too much.
       
   416      */
       
   417 
       
   418     /**
       
   419      * Acquires in exclusive uninterruptible mode for thread already in
       
   420      * queue. Used by condition wait methods as well as acquire.
       
   421      *
       
   422      * @param node the node
       
   423      * @param arg the acquire argument
       
   424      * @return {@code true} if interrupted while waiting
       
   425      */
       
   426     final boolean acquireQueued(final Node node, long arg) {
       
   427         boolean interrupted = false;
       
   428         try {
       
   429             for (;;) {
       
   430                 final Node p = node.predecessor();
       
   431                 if (p == head && tryAcquire(arg)) {
       
   432                     setHead(node);
       
   433                     p.next = null; // help GC
       
   434                     return interrupted;
       
   435                 }
       
   436                 if (shouldParkAfterFailedAcquire(p, node))
       
   437                     interrupted |= parkAndCheckInterrupt();
       
   438             }
       
   439         } catch (Throwable t) {
       
   440             cancelAcquire(node);
       
   441             if (interrupted)
       
   442                 selfInterrupt();
       
   443             throw t;
       
   444         }
       
   445     }
       
   446 
       
   447     /**
       
   448      * Acquires in exclusive interruptible mode.
       
   449      * @param arg the acquire argument
       
   450      */
       
   451     private void doAcquireInterruptibly(long arg)
       
   452         throws InterruptedException {
       
   453         final Node node = addWaiter(Node.EXCLUSIVE);
       
   454         try {
       
   455             for (;;) {
       
   456                 final Node p = node.predecessor();
       
   457                 if (p == head && tryAcquire(arg)) {
       
   458                     setHead(node);
       
   459                     p.next = null; // help GC
       
   460                     return;
       
   461                 }
       
   462                 if (shouldParkAfterFailedAcquire(p, node) &&
       
   463                     parkAndCheckInterrupt())
       
   464                     throw new InterruptedException();
       
   465             }
       
   466         } catch (Throwable t) {
       
   467             cancelAcquire(node);
       
   468             throw t;
       
   469         }
       
   470     }
       
   471 
       
   472     /**
       
   473      * Acquires in exclusive timed mode.
       
   474      *
       
   475      * @param arg the acquire argument
       
   476      * @param nanosTimeout max wait time
       
   477      * @return {@code true} if acquired
       
   478      */
       
   479     private boolean doAcquireNanos(long arg, long nanosTimeout)
       
   480             throws InterruptedException {
       
   481         if (nanosTimeout <= 0L)
       
   482             return false;
       
   483         final long deadline = System.nanoTime() + nanosTimeout;
       
   484         final Node node = addWaiter(Node.EXCLUSIVE);
       
   485         try {
       
   486             for (;;) {
       
   487                 final Node p = node.predecessor();
       
   488                 if (p == head && tryAcquire(arg)) {
       
   489                     setHead(node);
       
   490                     p.next = null; // help GC
       
   491                     return true;
       
   492                 }
       
   493                 nanosTimeout = deadline - System.nanoTime();
       
   494                 if (nanosTimeout <= 0L) {
       
   495                     cancelAcquire(node);
       
   496                     return false;
       
   497                 }
       
   498                 if (shouldParkAfterFailedAcquire(p, node) &&
       
   499                     nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
       
   500                     LockSupport.parkNanos(this, nanosTimeout);
       
   501                 if (Thread.interrupted())
       
   502                     throw new InterruptedException();
       
   503             }
       
   504         } catch (Throwable t) {
       
   505             cancelAcquire(node);
       
   506             throw t;
       
   507         }
       
   508     }
       
   509 
       
   510     /**
       
   511      * Acquires in shared uninterruptible mode.
       
   512      * @param arg the acquire argument
       
   513      */
       
   514     private void doAcquireShared(long arg) {
       
   515         final Node node = addWaiter(Node.SHARED);
       
   516         boolean interrupted = false;
       
   517         try {
       
   518             for (;;) {
       
   519                 final Node p = node.predecessor();
       
   520                 if (p == head) {
       
   521                     long r = tryAcquireShared(arg);
       
   522                     if (r >= 0) {
       
   523                         setHeadAndPropagate(node, r);
       
   524                         p.next = null; // help GC
       
   525                         return;
       
   526                     }
       
   527                 }
       
   528                 if (shouldParkAfterFailedAcquire(p, node))
       
   529                     interrupted |= parkAndCheckInterrupt();
       
   530             }
       
   531         } catch (Throwable t) {
       
   532             cancelAcquire(node);
       
   533             throw t;
       
   534         } finally {
       
   535             if (interrupted)
       
   536                 selfInterrupt();
       
   537         }
       
   538     }
       
   539 
       
   540     /**
       
   541      * Acquires in shared interruptible mode.
       
   542      * @param arg the acquire argument
       
   543      */
       
   544     private void doAcquireSharedInterruptibly(long arg)
       
   545         throws InterruptedException {
       
   546         final Node node = addWaiter(Node.SHARED);
       
   547         try {
       
   548             for (;;) {
       
   549                 final Node p = node.predecessor();
       
   550                 if (p == head) {
       
   551                     long r = tryAcquireShared(arg);
       
   552                     if (r >= 0) {
       
   553                         setHeadAndPropagate(node, r);
       
   554                         p.next = null; // help GC
       
   555                         return;
       
   556                     }
       
   557                 }
       
   558                 if (shouldParkAfterFailedAcquire(p, node) &&
       
   559                     parkAndCheckInterrupt())
       
   560                     throw new InterruptedException();
       
   561             }
       
   562         } catch (Throwable t) {
       
   563             cancelAcquire(node);
       
   564             throw t;
       
   565         }
       
   566     }
       
   567 
       
   568     /**
       
   569      * Acquires in shared timed mode.
       
   570      *
       
   571      * @param arg the acquire argument
       
   572      * @param nanosTimeout max wait time
       
   573      * @return {@code true} if acquired
       
   574      */
       
   575     private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
       
   576             throws InterruptedException {
       
   577         if (nanosTimeout <= 0L)
       
   578             return false;
       
   579         final long deadline = System.nanoTime() + nanosTimeout;
       
   580         final Node node = addWaiter(Node.SHARED);
       
   581         try {
       
   582             for (;;) {
       
   583                 final Node p = node.predecessor();
       
   584                 if (p == head) {
       
   585                     long r = tryAcquireShared(arg);
       
   586                     if (r >= 0) {
       
   587                         setHeadAndPropagate(node, r);
       
   588                         p.next = null; // help GC
       
   589                         return true;
       
   590                     }
       
   591                 }
       
   592                 nanosTimeout = deadline - System.nanoTime();
       
   593                 if (nanosTimeout <= 0L) {
       
   594                     cancelAcquire(node);
       
   595                     return false;
       
   596                 }
       
   597                 if (shouldParkAfterFailedAcquire(p, node) &&
       
   598                     nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
       
   599                     LockSupport.parkNanos(this, nanosTimeout);
       
   600                 if (Thread.interrupted())
       
   601                     throw new InterruptedException();
       
   602             }
       
   603         } catch (Throwable t) {
       
   604             cancelAcquire(node);
       
   605             throw t;
       
   606         }
       
   607     }
   411     }
   608 
   412 
   609     // Main exported methods
   413     // Main exported methods
   610 
   414 
   611     /**
   415     /**
   754      * @param arg the acquire argument.  This value is conveyed to
   558      * @param arg the acquire argument.  This value is conveyed to
   755      *        {@link #tryAcquire} but is otherwise uninterpreted and
   559      *        {@link #tryAcquire} but is otherwise uninterpreted and
   756      *        can represent anything you like.
   560      *        can represent anything you like.
   757      */
   561      */
   758     public final void acquire(long arg) {
   562     public final void acquire(long arg) {
   759         if (!tryAcquire(arg) &&
   563         if (!tryAcquire(arg))
   760             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
   564             acquire(null, arg, false, false, false, 0L);
   761             selfInterrupt();
       
   762     }
   565     }
   763 
   566 
   764     /**
   567     /**
   765      * Acquires in exclusive mode, aborting if interrupted.
   568      * Acquires in exclusive mode, aborting if interrupted.
   766      * Implemented by first checking interrupt status, then invoking
   569      * Implemented by first checking interrupt status, then invoking
   774      *        {@link #tryAcquire} but is otherwise uninterpreted and
   577      *        {@link #tryAcquire} but is otherwise uninterpreted and
   775      *        can represent anything you like.
   578      *        can represent anything you like.
   776      * @throws InterruptedException if the current thread is interrupted
   579      * @throws InterruptedException if the current thread is interrupted
   777      */
   580      */
   778     public final void acquireInterruptibly(long arg)
   581     public final void acquireInterruptibly(long arg)
   779             throws InterruptedException {
   582         throws InterruptedException {
   780         if (Thread.interrupted())
   583         if (Thread.interrupted() ||
       
   584             (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
   781             throw new InterruptedException();
   585             throw new InterruptedException();
   782         if (!tryAcquire(arg))
       
   783             doAcquireInterruptibly(arg);
       
   784     }
   586     }
   785 
   587 
   786     /**
   588     /**
   787      * Attempts to acquire in exclusive mode, aborting if interrupted,
   589      * Attempts to acquire in exclusive mode, aborting if interrupted,
   788      * and failing if the given timeout elapses.  Implemented by first
   590      * and failing if the given timeout elapses.  Implemented by first
   799      * @param nanosTimeout the maximum number of nanoseconds to wait
   601      * @param nanosTimeout the maximum number of nanoseconds to wait
   800      * @return {@code true} if acquired; {@code false} if timed out
   602      * @return {@code true} if acquired; {@code false} if timed out
   801      * @throws InterruptedException if the current thread is interrupted
   603      * @throws InterruptedException if the current thread is interrupted
   802      */
   604      */
   803     public final boolean tryAcquireNanos(long arg, long nanosTimeout)
   605     public final boolean tryAcquireNanos(long arg, long nanosTimeout)
   804             throws InterruptedException {
   606         throws InterruptedException {
   805         if (Thread.interrupted())
   607         if (!Thread.interrupted()) {
   806             throw new InterruptedException();
   608             if (tryAcquire(arg))
   807         return tryAcquire(arg) ||
   609                 return true;
   808             doAcquireNanos(arg, nanosTimeout);
   610             if (nanosTimeout <= 0L)
       
   611                 return false;
       
   612             int stat = acquire(null, arg, false, true, true,
       
   613                                System.nanoTime() + nanosTimeout);
       
   614             if (stat > 0)
       
   615                 return true;
       
   616             if (stat == 0)
       
   617                 return false;
       
   618         }
       
   619         throw new InterruptedException();
   809     }
   620     }
   810 
   621 
   811     /**
   622     /**
   812      * Releases in exclusive mode.  Implemented by unblocking one or
   623      * Releases in exclusive mode.  Implemented by unblocking one or
   813      * more threads if {@link #tryRelease} returns true.
   624      * more threads if {@link #tryRelease} returns true.
   818      *        can represent anything you like.
   629      *        can represent anything you like.
   819      * @return the value returned from {@link #tryRelease}
   630      * @return the value returned from {@link #tryRelease}
   820      */
   631      */
   821     public final boolean release(long arg) {
   632     public final boolean release(long arg) {
   822         if (tryRelease(arg)) {
   633         if (tryRelease(arg)) {
   823             Node h = head;
   634             signalNext(head);
   824             if (h != null && h.waitStatus != 0)
       
   825                 unparkSuccessor(h);
       
   826             return true;
   635             return true;
   827         }
   636         }
   828         return false;
   637         return false;
   829     }
   638     }
   830 
   639 
   839      *        {@link #tryAcquireShared} but is otherwise uninterpreted
   648      *        {@link #tryAcquireShared} but is otherwise uninterpreted
   840      *        and can represent anything you like.
   649      *        and can represent anything you like.
   841      */
   650      */
   842     public final void acquireShared(long arg) {
   651     public final void acquireShared(long arg) {
   843         if (tryAcquireShared(arg) < 0)
   652         if (tryAcquireShared(arg) < 0)
   844             doAcquireShared(arg);
   653             acquire(null, arg, true, false, false, 0L);
   845     }
   654     }
   846 
   655 
   847     /**
   656     /**
   848      * Acquires in shared mode, aborting if interrupted.  Implemented
   657      * Acquires in shared mode, aborting if interrupted.  Implemented
   849      * by first checking interrupt status, then invoking at least once
   658      * by first checking interrupt status, then invoking at least once
   856      * otherwise uninterpreted and can represent anything
   665      * otherwise uninterpreted and can represent anything
   857      * you like.
   666      * you like.
   858      * @throws InterruptedException if the current thread is interrupted
   667      * @throws InterruptedException if the current thread is interrupted
   859      */
   668      */
   860     public final void acquireSharedInterruptibly(long arg)
   669     public final void acquireSharedInterruptibly(long arg)
   861             throws InterruptedException {
   670         throws InterruptedException {
   862         if (Thread.interrupted())
   671         if (Thread.interrupted() ||
       
   672             (tryAcquireShared(arg) < 0 &&
       
   673              acquire(null, arg, true, true, false, 0L) < 0))
   863             throw new InterruptedException();
   674             throw new InterruptedException();
   864         if (tryAcquireShared(arg) < 0)
       
   865             doAcquireSharedInterruptibly(arg);
       
   866     }
   675     }
   867 
   676 
   868     /**
   677     /**
   869      * Attempts to acquire in shared mode, aborting if interrupted, and
   678      * Attempts to acquire in shared mode, aborting if interrupted, and
   870      * failing if the given timeout elapses.  Implemented by first
   679      * failing if the given timeout elapses.  Implemented by first
   881      * @return {@code true} if acquired; {@code false} if timed out
   690      * @return {@code true} if acquired; {@code false} if timed out
   882      * @throws InterruptedException if the current thread is interrupted
   691      * @throws InterruptedException if the current thread is interrupted
   883      */
   692      */
   884     public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
   693     public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
   885             throws InterruptedException {
   694             throws InterruptedException {
   886         if (Thread.interrupted())
   695         if (!Thread.interrupted()) {
   887             throw new InterruptedException();
   696             if (tryAcquireShared(arg) >= 0)
   888         return tryAcquireShared(arg) >= 0 ||
   697                 return true;
   889             doAcquireSharedNanos(arg, nanosTimeout);
   698             if (nanosTimeout <= 0L)
       
   699                 return false;
       
   700             int stat = acquire(null, arg, true, true, true,
       
   701                                System.nanoTime() + nanosTimeout);
       
   702             if (stat > 0)
       
   703                 return true;
       
   704             if (stat == 0)
       
   705                 return false;
       
   706         }
       
   707         throw new InterruptedException();
   890     }
   708     }
   891 
   709 
   892     /**
   710     /**
   893      * Releases in shared mode.  Implemented by unblocking one or more
   711      * Releases in shared mode.  Implemented by unblocking one or more
   894      * threads if {@link #tryReleaseShared} returns true.
   712      * threads if {@link #tryReleaseShared} returns true.
   898      *        and can represent anything you like.
   716      *        and can represent anything you like.
   899      * @return the value returned from {@link #tryReleaseShared}
   717      * @return the value returned from {@link #tryReleaseShared}
   900      */
   718      */
   901     public final boolean releaseShared(long arg) {
   719     public final boolean releaseShared(long arg) {
   902         if (tryReleaseShared(arg)) {
   720         if (tryReleaseShared(arg)) {
   903             doReleaseShared();
   721             signalNext(head);
   904             return true;
   722             return true;
   905         }
   723         }
   906         return false;
   724         return false;
   907     }
   725     }
   908 
   726 
   916      *
   734      *
   917      * @return {@code true} if there may be other threads waiting to acquire
   735      * @return {@code true} if there may be other threads waiting to acquire
   918      */
   736      */
   919     public final boolean hasQueuedThreads() {
   737     public final boolean hasQueuedThreads() {
   920         for (Node p = tail, h = head; p != h && p != null; p = p.prev)
   738         for (Node p = tail, h = head; p != h && p != null; p = p.prev)
   921             if (p.waitStatus <= 0)
   739             if (p.status >= 0)
   922                 return true;
   740                 return true;
   923         return false;
   741         return false;
   924     }
   742     }
   925 
   743 
   926     /**
   744     /**
   946      *
   764      *
   947      * @return the first (longest-waiting) thread in the queue, or
   765      * @return the first (longest-waiting) thread in the queue, or
   948      *         {@code null} if no threads are currently queued
   766      *         {@code null} if no threads are currently queued
   949      */
   767      */
   950     public final Thread getFirstQueuedThread() {
   768     public final Thread getFirstQueuedThread() {
   951         // handle only fast path, else relay
   769         Thread first = null, w; Node h, s;
   952         return (head == tail) ? null : fullGetFirstQueuedThread();
   770         if ((h = head) != null && ((s = h.next) == null ||
   953     }
   771                                    (first = s.waiter) == null ||
   954 
   772                                    s.prev == null)) {
   955     /**
   773             // traverse from tail on stale reads
   956      * Version of getFirstQueuedThread called when fastpath fails.
   774             for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
   957      */
   775                 if ((w = p.waiter) != null)
   958     private Thread fullGetFirstQueuedThread() {
   776                     first = w;
   959         /*
   777         }
   960          * The first node is normally head.next. Try to get its
   778         return first;
   961          * thread field, ensuring consistent reads: If thread
       
   962          * field is nulled out or s.prev is no longer head, then
       
   963          * some other thread(s) concurrently performed setHead in
       
   964          * between some of our reads. We try this twice before
       
   965          * resorting to traversal.
       
   966          */
       
   967         Node h, s;
       
   968         Thread st;
       
   969         if (((h = head) != null && (s = h.next) != null &&
       
   970              s.prev == head && (st = s.thread) != null) ||
       
   971             ((h = head) != null && (s = h.next) != null &&
       
   972              s.prev == head && (st = s.thread) != null))
       
   973             return st;
       
   974 
       
   975         /*
       
   976          * Head's next field might not have been set yet, or may have
       
   977          * been unset after setHead. So we must check to see if tail
       
   978          * is actually first node. If not, we continue on, safely
       
   979          * traversing from tail back to head to find first,
       
   980          * guaranteeing termination.
       
   981          */
       
   982 
       
   983         Thread firstThread = null;
       
   984         for (Node p = tail; p != null && p != head; p = p.prev) {
       
   985             Thread t = p.thread;
       
   986             if (t != null)
       
   987                 firstThread = t;
       
   988         }
       
   989         return firstThread;
       
   990     }
   779     }
   991 
   780 
   992     /**
   781     /**
   993      * Returns true if the given thread is currently queued.
   782      * Returns true if the given thread is currently queued.
   994      *
   783      *
  1001      */
   790      */
  1002     public final boolean isQueued(Thread thread) {
   791     public final boolean isQueued(Thread thread) {
  1003         if (thread == null)
   792         if (thread == null)
  1004             throw new NullPointerException();
   793             throw new NullPointerException();
  1005         for (Node p = tail; p != null; p = p.prev)
   794         for (Node p = tail; p != null; p = p.prev)
  1006             if (p.thread == thread)
   795             if (p.waiter == thread)
  1007                 return true;
   796                 return true;
  1008         return false;
   797         return false;
  1009     }
   798     }
  1010 
   799 
  1011     /**
   800     /**
  1017      * is not the first queued thread.  Used only as a heuristic in
   806      * is not the first queued thread.  Used only as a heuristic in
  1018      * ReentrantReadWriteLock.
   807      * ReentrantReadWriteLock.
  1019      */
   808      */
  1020     final boolean apparentlyFirstQueuedIsExclusive() {
   809     final boolean apparentlyFirstQueuedIsExclusive() {
  1021         Node h, s;
   810         Node h, s;
  1022         return (h = head) != null &&
   811         return (h = head) != null && (s = h.next)  != null &&
  1023             (s = h.next)  != null &&
   812             !(s instanceof SharedNode) && s.waiter != null;
  1024             !s.isShared()         &&
       
  1025             s.thread != null;
       
  1026     }
   813     }
  1027 
   814 
  1028     /**
   815     /**
  1029      * Queries whether any threads have been waiting to acquire longer
   816      * Queries whether any threads have been waiting to acquire longer
  1030      * than the current thread.
   817      * than the current thread.
  1050      * (unless this is a reentrant acquire).  For example, the {@code
   837      * (unless this is a reentrant acquire).  For example, the {@code
  1051      * tryAcquire} method for a fair, reentrant, exclusive mode
   838      * tryAcquire} method for a fair, reentrant, exclusive mode
  1052      * synchronizer might look like this:
   839      * synchronizer might look like this:
  1053      *
   840      *
  1054      * <pre> {@code
   841      * <pre> {@code
  1055      * protected boolean tryAcquire(int arg) {
   842      * protected boolean tryAcquire(long arg) {
  1056      *   if (isHeldExclusively()) {
   843      *   if (isHeldExclusively()) {
  1057      *     // A reentrant acquire; increment hold count
   844      *     // A reentrant acquire; increment hold count
  1058      *     return true;
   845      *     return true;
  1059      *   } else if (hasQueuedPredecessors()) {
   846      *   } else if (hasQueuedPredecessors()) {
  1060      *     return false;
   847      *     return false;
  1067      *         current thread, and {@code false} if the current thread
   854      *         current thread, and {@code false} if the current thread
  1068      *         is at the head of the queue or the queue is empty
   855      *         is at the head of the queue or the queue is empty
  1069      * @since 1.7
   856      * @since 1.7
  1070      */
   857      */
  1071     public final boolean hasQueuedPredecessors() {
   858     public final boolean hasQueuedPredecessors() {
  1072         Node h, s;
   859         Thread first = null; Node h, s;
  1073         if ((h = head) != null) {
   860         if ((h = head) != null && ((s = h.next) == null ||
  1074             if ((s = h.next) == null || s.waitStatus > 0) {
   861                                    (first = s.waiter) == null ||
  1075                 s = null; // traverse in case of concurrent cancellation
   862                                    s.prev == null))
  1076                 for (Node p = tail; p != h && p != null; p = p.prev) {
   863             first = getFirstQueuedThread(); // retry via getFirstQueuedThread
  1077                     if (p.waitStatus <= 0)
   864         return first != null && first != Thread.currentThread();
  1078                         s = p;
       
  1079                 }
       
  1080             }
       
  1081             if (s != null && s.thread != Thread.currentThread())
       
  1082                 return true;
       
  1083         }
       
  1084         return false;
       
  1085     }
   865     }
  1086 
   866 
  1087     // Instrumentation and monitoring methods
   867     // Instrumentation and monitoring methods
  1088 
   868 
  1089     /**
   869     /**
  1096      * @return the estimated number of threads waiting to acquire
   876      * @return the estimated number of threads waiting to acquire
  1097      */
   877      */
  1098     public final int getQueueLength() {
   878     public final int getQueueLength() {
  1099         int n = 0;
   879         int n = 0;
  1100         for (Node p = tail; p != null; p = p.prev) {
   880         for (Node p = tail; p != null; p = p.prev) {
  1101             if (p.thread != null)
   881             if (p.waiter != null)
  1102                 ++n;
   882                 ++n;
  1103         }
   883         }
  1104         return n;
   884         return n;
  1105     }
   885     }
  1106 
   886 
  1116      * @return the collection of threads
   896      * @return the collection of threads
  1117      */
   897      */
  1118     public final Collection<Thread> getQueuedThreads() {
   898     public final Collection<Thread> getQueuedThreads() {
  1119         ArrayList<Thread> list = new ArrayList<>();
   899         ArrayList<Thread> list = new ArrayList<>();
  1120         for (Node p = tail; p != null; p = p.prev) {
   900         for (Node p = tail; p != null; p = p.prev) {
  1121             Thread t = p.thread;
   901             Thread t = p.waiter;
  1122             if (t != null)
   902             if (t != null)
  1123                 list.add(t);
   903                 list.add(t);
  1124         }
   904         }
  1125         return list;
   905         return list;
  1126     }
   906     }
  1134      * @return the collection of threads
   914      * @return the collection of threads
  1135      */
   915      */
  1136     public final Collection<Thread> getExclusiveQueuedThreads() {
   916     public final Collection<Thread> getExclusiveQueuedThreads() {
  1137         ArrayList<Thread> list = new ArrayList<>();
   917         ArrayList<Thread> list = new ArrayList<>();
  1138         for (Node p = tail; p != null; p = p.prev) {
   918         for (Node p = tail; p != null; p = p.prev) {
  1139             if (!p.isShared()) {
   919             if (!(p instanceof SharedNode)) {
  1140                 Thread t = p.thread;
   920                 Thread t = p.waiter;
  1141                 if (t != null)
   921                 if (t != null)
  1142                     list.add(t);
   922                     list.add(t);
  1143             }
   923             }
  1144         }
   924         }
  1145         return list;
   925         return list;
  1154      * @return the collection of threads
   934      * @return the collection of threads
  1155      */
   935      */
  1156     public final Collection<Thread> getSharedQueuedThreads() {
   936     public final Collection<Thread> getSharedQueuedThreads() {
  1157         ArrayList<Thread> list = new ArrayList<>();
   937         ArrayList<Thread> list = new ArrayList<>();
  1158         for (Node p = tail; p != null; p = p.prev) {
   938         for (Node p = tail; p != null; p = p.prev) {
  1159             if (p.isShared()) {
   939             if (p instanceof SharedNode) {
  1160                 Thread t = p.thread;
   940                 Thread t = p.waiter;
  1161                 if (t != null)
   941                 if (t != null)
  1162                     list.add(t);
   942                     list.add(t);
  1163             }
   943             }
  1164         }
   944         }
  1165         return list;
   945         return list;
  1176      */
   956      */
  1177     public String toString() {
   957     public String toString() {
  1178         return super.toString()
   958         return super.toString()
  1179             + "[State = " + getState() + ", "
   959             + "[State = " + getState() + ", "
  1180             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
   960             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
  1181     }
       
  1182 
       
  1183 
       
  1184     // Internal support methods for Conditions
       
  1185 
       
  1186     /**
       
  1187      * Returns true if a node, always one that was initially placed on
       
  1188      * a condition queue, is now waiting to reacquire on sync queue.
       
  1189      * @param node the node
       
  1190      * @return true if is reacquiring
       
  1191      */
       
  1192     final boolean isOnSyncQueue(Node node) {
       
  1193         if (node.waitStatus == Node.CONDITION || node.prev == null)
       
  1194             return false;
       
  1195         if (node.next != null) // If has successor, it must be on queue
       
  1196             return true;
       
  1197         /*
       
  1198          * node.prev can be non-null, but not yet on queue because
       
  1199          * the CAS to place it on queue can fail. So we have to
       
  1200          * traverse from tail to make sure it actually made it.  It
       
  1201          * will always be near the tail in calls to this method, and
       
  1202          * unless the CAS failed (which is unlikely), it will be
       
  1203          * there, so we hardly ever traverse much.
       
  1204          */
       
  1205         return findNodeFromTail(node);
       
  1206     }
       
  1207 
       
  1208     /**
       
  1209      * Returns true if node is on sync queue by searching backwards from tail.
       
  1210      * Called only when needed by isOnSyncQueue.
       
  1211      * @return true if present
       
  1212      */
       
  1213     private boolean findNodeFromTail(Node node) {
       
  1214         // We check for node first, since it's likely to be at or near tail.
       
  1215         // tail is known to be non-null, so we could re-order to "save"
       
  1216         // one null check, but we leave it this way to help the VM.
       
  1217         for (Node p = tail;;) {
       
  1218             if (p == node)
       
  1219                 return true;
       
  1220             if (p == null)
       
  1221                 return false;
       
  1222             p = p.prev;
       
  1223         }
       
  1224     }
       
  1225 
       
  1226     /**
       
  1227      * Transfers a node from a condition queue onto sync queue.
       
  1228      * Returns true if successful.
       
  1229      * @param node the node
       
  1230      * @return true if successfully transferred (else the node was
       
  1231      * cancelled before signal)
       
  1232      */
       
  1233     final boolean transferForSignal(Node node) {
       
  1234         /*
       
  1235          * If cannot change waitStatus, the node has been cancelled.
       
  1236          */
       
  1237         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
       
  1238             return false;
       
  1239 
       
  1240         /*
       
  1241          * Splice onto queue and try to set waitStatus of predecessor to
       
  1242          * indicate that thread is (probably) waiting. If cancelled or
       
  1243          * attempt to set waitStatus fails, wake up to resync (in which
       
  1244          * case the waitStatus can be transiently and harmlessly wrong).
       
  1245          */
       
  1246         Node p = enq(node);
       
  1247         int ws = p.waitStatus;
       
  1248         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
       
  1249             LockSupport.unpark(node.thread);
       
  1250         return true;
       
  1251     }
       
  1252 
       
  1253     /**
       
  1254      * Transfers node, if necessary, to sync queue after a cancelled wait.
       
  1255      * Returns true if thread was cancelled before being signalled.
       
  1256      *
       
  1257      * @param node the node
       
  1258      * @return true if cancelled before the node was signalled
       
  1259      */
       
  1260     final boolean transferAfterCancelledWait(Node node) {
       
  1261         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
       
  1262             enq(node);
       
  1263             return true;
       
  1264         }
       
  1265         /*
       
  1266          * If we lost out to a signal(), then we can't proceed
       
  1267          * until it finishes its enq().  Cancelling during an
       
  1268          * incomplete transfer is both rare and transient, so just
       
  1269          * spin.
       
  1270          */
       
  1271         while (!isOnSyncQueue(node))
       
  1272             Thread.yield();
       
  1273         return false;
       
  1274     }
       
  1275 
       
  1276     /**
       
  1277      * Invokes release with current state value; returns saved state.
       
  1278      * Cancels node and throws exception on failure.
       
  1279      * @param node the condition node for this wait
       
  1280      * @return previous sync state
       
  1281      */
       
  1282     final long fullyRelease(Node node) {
       
  1283         try {
       
  1284             long savedState = getState();
       
  1285             if (release(savedState))
       
  1286                 return savedState;
       
  1287             throw new IllegalMonitorStateException();
       
  1288         } catch (Throwable t) {
       
  1289             node.waitStatus = Node.CANCELLED;
       
  1290             throw t;
       
  1291         }
       
  1292     }
   961     }
  1293 
   962 
  1294     // Instrumentation methods for conditions
   963     // Instrumentation methods for conditions
  1295 
   964 
  1296     /**
   965     /**
  1382      * condition semantics that rely on those of the associated
  1051      * condition semantics that rely on those of the associated
  1383      * {@code AbstractQueuedLongSynchronizer}.
  1052      * {@code AbstractQueuedLongSynchronizer}.
  1384      *
  1053      *
  1385      * <p>This class is Serializable, but all fields are transient,
  1054      * <p>This class is Serializable, but all fields are transient,
  1386      * so deserialized conditions have no waiters.
  1055      * so deserialized conditions have no waiters.
  1387      *
       
  1388      * @since 1.6
       
  1389      */
  1056      */
  1390     public class ConditionObject implements Condition, java.io.Serializable {
  1057     public class ConditionObject implements Condition, java.io.Serializable {
  1391         private static final long serialVersionUID = 1173984872572414699L;
  1058         private static final long serialVersionUID = 1173984872572414699L;
  1392         /** First node of condition queue. */
  1059         /** First node of condition queue. */
  1393         private transient Node firstWaiter;
  1060         private transient ConditionNode firstWaiter;
  1394         /** Last node of condition queue. */
  1061         /** Last node of condition queue. */
  1395         private transient Node lastWaiter;
  1062         private transient ConditionNode lastWaiter;
  1396 
  1063 
  1397         /**
  1064         /**
  1398          * Creates a new {@code ConditionObject} instance.
  1065          * Creates a new {@code ConditionObject} instance.
  1399          */
  1066          */
  1400         public ConditionObject() { }
  1067         public ConditionObject() { }
  1401 
  1068 
  1402         // Internal methods
  1069         // Signalling methods
  1403 
  1070 
  1404         /**
  1071         /**
  1405          * Adds a new waiter to wait queue.
  1072          * Removes and transfers one or all waiters to sync queue.
  1406          * @return its new wait node
  1073          */
  1407          */
  1074         private void doSignal(ConditionNode first, boolean all) {
  1408         private Node addConditionWaiter() {
  1075             while (first != null) {
  1409             if (!isHeldExclusively())
  1076                 ConditionNode next = first.nextWaiter;
  1410                 throw new IllegalMonitorStateException();
  1077                 if ((firstWaiter = next) == null)
  1411             Node t = lastWaiter;
       
  1412             // If lastWaiter is cancelled, clean out.
       
  1413             if (t != null && t.waitStatus != Node.CONDITION) {
       
  1414                 unlinkCancelledWaiters();
       
  1415                 t = lastWaiter;
       
  1416             }
       
  1417 
       
  1418             Node node = new Node(Node.CONDITION);
       
  1419 
       
  1420             if (t == null)
       
  1421                 firstWaiter = node;
       
  1422             else
       
  1423                 t.nextWaiter = node;
       
  1424             lastWaiter = node;
       
  1425             return node;
       
  1426         }
       
  1427 
       
  1428         /**
       
  1429          * Removes and transfers nodes until hit non-cancelled one or
       
  1430          * null. Split out from signal in part to encourage compilers
       
  1431          * to inline the case of no waiters.
       
  1432          * @param first (non-null) the first node on condition queue
       
  1433          */
       
  1434         private void doSignal(Node first) {
       
  1435             do {
       
  1436                 if ( (firstWaiter = first.nextWaiter) == null)
       
  1437                     lastWaiter = null;
  1078                     lastWaiter = null;
  1438                 first.nextWaiter = null;
  1079                 if ((first.getAndUnsetStatus(COND) & COND) != 0) {
  1439             } while (!transferForSignal(first) &&
  1080                     enqueue(first);
  1440                      (first = firstWaiter) != null);
  1081                     if (!all)
  1441         }
  1082                         break;
  1442 
  1083                 }
  1443         /**
       
  1444          * Removes and transfers all nodes.
       
  1445          * @param first (non-null) the first node on condition queue
       
  1446          */
       
  1447         private void doSignalAll(Node first) {
       
  1448             lastWaiter = firstWaiter = null;
       
  1449             do {
       
  1450                 Node next = first.nextWaiter;
       
  1451                 first.nextWaiter = null;
       
  1452                 transferForSignal(first);
       
  1453                 first = next;
  1084                 first = next;
  1454             } while (first != null);
  1085             }
  1455         }
  1086         }
  1456 
       
  1457         /**
       
  1458          * Unlinks cancelled waiter nodes from condition queue.
       
  1459          * Called only while holding lock. This is called when
       
  1460          * cancellation occurred during condition wait, and upon
       
  1461          * insertion of a new waiter when lastWaiter is seen to have
       
  1462          * been cancelled. This method is needed to avoid garbage
       
  1463          * retention in the absence of signals. So even though it may
       
  1464          * require a full traversal, it comes into play only when
       
  1465          * timeouts or cancellations occur in the absence of
       
  1466          * signals. It traverses all nodes rather than stopping at a
       
  1467          * particular target to unlink all pointers to garbage nodes
       
  1468          * without requiring many re-traversals during cancellation
       
  1469          * storms.
       
  1470          */
       
  1471         private void unlinkCancelledWaiters() {
       
  1472             Node t = firstWaiter;
       
  1473             Node trail = null;
       
  1474             while (t != null) {
       
  1475                 Node next = t.nextWaiter;
       
  1476                 if (t.waitStatus != Node.CONDITION) {
       
  1477                     t.nextWaiter = null;
       
  1478                     if (trail == null)
       
  1479                         firstWaiter = next;
       
  1480                     else
       
  1481                         trail.nextWaiter = next;
       
  1482                     if (next == null)
       
  1483                         lastWaiter = trail;
       
  1484                 }
       
  1485                 else
       
  1486                     trail = t;
       
  1487                 t = next;
       
  1488             }
       
  1489         }
       
  1490 
       
  1491         // public methods
       
  1492 
  1087 
  1493         /**
  1088         /**
  1494          * Moves the longest-waiting thread, if one exists, from the
  1089          * Moves the longest-waiting thread, if one exists, from the
  1495          * wait queue for this condition to the wait queue for the
  1090          * wait queue for this condition to the wait queue for the
  1496          * owning lock.
  1091          * owning lock.
  1497          *
  1092          *
  1498          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1093          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1499          *         returns {@code false}
  1094          *         returns {@code false}
  1500          */
  1095          */
  1501         public final void signal() {
  1096         public final void signal() {
       
  1097             ConditionNode first = firstWaiter;
  1502             if (!isHeldExclusively())
  1098             if (!isHeldExclusively())
  1503                 throw new IllegalMonitorStateException();
  1099                 throw new IllegalMonitorStateException();
  1504             Node first = firstWaiter;
       
  1505             if (first != null)
  1100             if (first != null)
  1506                 doSignal(first);
  1101                 doSignal(first, false);
  1507         }
  1102         }
  1508 
  1103 
  1509         /**
  1104         /**
  1510          * Moves all threads from the wait queue for this condition to
  1105          * Moves all threads from the wait queue for this condition to
  1511          * the wait queue for the owning lock.
  1106          * the wait queue for the owning lock.
  1512          *
  1107          *
  1513          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1108          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1514          *         returns {@code false}
  1109          *         returns {@code false}
  1515          */
  1110          */
  1516         public final void signalAll() {
  1111         public final void signalAll() {
       
  1112             ConditionNode first = firstWaiter;
  1517             if (!isHeldExclusively())
  1113             if (!isHeldExclusively())
  1518                 throw new IllegalMonitorStateException();
  1114                 throw new IllegalMonitorStateException();
  1519             Node first = firstWaiter;
       
  1520             if (first != null)
  1115             if (first != null)
  1521                 doSignalAll(first);
  1116                 doSignal(first, true);
       
  1117         }
       
  1118 
       
  1119         // Waiting methods
       
  1120 
       
  1121         /**
       
  1122          * Adds node to condition list and releases lock.
       
  1123          *
       
  1124          * @param node the node
       
  1125          * @return savedState to reacquire after wait
       
  1126          */
       
  1127         private long enableWait(ConditionNode node) {
       
  1128             if (isHeldExclusively()) {
       
  1129                 node.waiter = Thread.currentThread();
       
  1130                 node.setStatusRelaxed(COND | WAITING);
       
  1131                 ConditionNode last = lastWaiter;
       
  1132                 if (last == null)
       
  1133                     firstWaiter = node;
       
  1134                 else
       
  1135                     last.nextWaiter = node;
       
  1136                 lastWaiter = node;
       
  1137                 long savedState = getState();
       
  1138                 if (release(savedState))
       
  1139                     return savedState;
       
  1140             }
       
  1141             node.status = CANCELLED; // lock not held or inconsistent
       
  1142             throw new IllegalMonitorStateException();
       
  1143         }
       
  1144 
       
  1145         /**
       
  1146          * Returns true if a node that was initially placed on a condition
       
  1147          * queue is now ready to reacquire on sync queue.
       
  1148          * @param node the node
       
  1149          * @return true if is reacquiring
       
  1150          */
       
  1151         private boolean canReacquire(ConditionNode node) {
       
  1152             // check links, not status to avoid enqueue race
       
  1153             return node != null && node.prev != null && isEnqueued(node);
       
  1154         }
       
  1155 
       
  1156         /**
       
  1157          * Unlinks the given node and other non-waiting nodes from
       
  1158          * condition queue unless already unlinked.
       
  1159          */
       
  1160         private void unlinkCancelledWaiters(ConditionNode node) {
       
  1161             if (node == null || node.nextWaiter != null || node == lastWaiter) {
       
  1162                 ConditionNode w = firstWaiter, trail = null;
       
  1163                 while (w != null) {
       
  1164                     ConditionNode next = w.nextWaiter;
       
  1165                     if ((w.status & COND) == 0) {
       
  1166                         w.nextWaiter = null;
       
  1167                         if (trail == null)
       
  1168                             firstWaiter = next;
       
  1169                         else
       
  1170                             trail.nextWaiter = next;
       
  1171                         if (next == null)
       
  1172                             lastWaiter = trail;
       
  1173                     } else
       
  1174                         trail = w;
       
  1175                     w = next;
       
  1176                 }
       
  1177             }
  1522         }
  1178         }
  1523 
  1179 
  1524         /**
  1180         /**
  1525          * Implements uninterruptible condition wait.
  1181          * Implements uninterruptible condition wait.
  1526          * <ol>
  1182          * <ol>
  1531          * <li>Reacquire by invoking specialized version of
  1187          * <li>Reacquire by invoking specialized version of
  1532          *     {@link #acquire} with saved state as argument.
  1188          *     {@link #acquire} with saved state as argument.
  1533          * </ol>
  1189          * </ol>
  1534          */
  1190          */
  1535         public final void awaitUninterruptibly() {
  1191         public final void awaitUninterruptibly() {
  1536             Node node = addConditionWaiter();
  1192             ConditionNode node = new ConditionNode();
  1537             long savedState = fullyRelease(node);
  1193             long savedState = enableWait(node);
       
  1194             LockSupport.setCurrentBlocker(this); // for back-compatibility
  1538             boolean interrupted = false;
  1195             boolean interrupted = false;
  1539             while (!isOnSyncQueue(node)) {
  1196             while (!canReacquire(node)) {
  1540                 LockSupport.park(this);
       
  1541                 if (Thread.interrupted())
  1197                 if (Thread.interrupted())
  1542                     interrupted = true;
  1198                     interrupted = true;
  1543             }
  1199                 else if ((node.status & COND) != 0) {
  1544             if (acquireQueued(node, savedState) || interrupted)
  1200                     try {
  1545                 selfInterrupt();
  1201                         ForkJoinPool.managedBlock(node);
  1546         }
  1202                     } catch (InterruptedException ie) {
  1547 
  1203                         interrupted = true;
  1548         /*
  1204                     }
  1549          * For interruptible waits, we need to track whether to throw
  1205                 } else
  1550          * InterruptedException, if interrupted while blocked on
  1206                     Thread.onSpinWait();    // awoke while enqueuing
  1551          * condition, versus reinterrupt current thread, if
  1207             }
  1552          * interrupted while blocked waiting to re-acquire.
  1208             LockSupport.setCurrentBlocker(null);
  1553          */
  1209             node.clearStatus();
  1554 
  1210             acquire(node, savedState, false, false, false, 0L);
  1555         /** Mode meaning to reinterrupt on exit from wait */
  1211             if (interrupted)
  1556         private static final int REINTERRUPT =  1;
  1212                 Thread.currentThread().interrupt();
  1557         /** Mode meaning to throw InterruptedException on exit from wait */
       
  1558         private static final int THROW_IE    = -1;
       
  1559 
       
  1560         /**
       
  1561          * Checks for interrupt, returning THROW_IE if interrupted
       
  1562          * before signalled, REINTERRUPT if after signalled, or
       
  1563          * 0 if not interrupted.
       
  1564          */
       
  1565         private int checkInterruptWhileWaiting(Node node) {
       
  1566             return Thread.interrupted() ?
       
  1567                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
       
  1568                 0;
       
  1569         }
       
  1570 
       
  1571         /**
       
  1572          * Throws InterruptedException, reinterrupts current thread, or
       
  1573          * does nothing, depending on mode.
       
  1574          */
       
  1575         private void reportInterruptAfterWait(int interruptMode)
       
  1576             throws InterruptedException {
       
  1577             if (interruptMode == THROW_IE)
       
  1578                 throw new InterruptedException();
       
  1579             else if (interruptMode == REINTERRUPT)
       
  1580                 selfInterrupt();
       
  1581         }
  1213         }
  1582 
  1214 
  1583         /**
  1215         /**
  1584          * Implements interruptible condition wait.
  1216          * Implements interruptible condition wait.
  1585          * <ol>
  1217          * <ol>
  1594          * </ol>
  1226          * </ol>
  1595          */
  1227          */
  1596         public final void await() throws InterruptedException {
  1228         public final void await() throws InterruptedException {
  1597             if (Thread.interrupted())
  1229             if (Thread.interrupted())
  1598                 throw new InterruptedException();
  1230                 throw new InterruptedException();
  1599             Node node = addConditionWaiter();
  1231             ConditionNode node = new ConditionNode();
  1600             long savedState = fullyRelease(node);
  1232             long savedState = enableWait(node);
  1601             int interruptMode = 0;
  1233             LockSupport.setCurrentBlocker(this); // for back-compatibility
  1602             while (!isOnSyncQueue(node)) {
  1234             boolean interrupted = false, cancelled = false;
  1603                 LockSupport.park(this);
  1235             while (!canReacquire(node)) {
  1604                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1236                 if (interrupted |= Thread.interrupted()) {
  1605                     break;
  1237                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
  1606             }
  1238                         break;              // else interrupted after signal
  1607             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1239                 } else if ((node.status & COND) != 0) {
  1608                 interruptMode = REINTERRUPT;
  1240                     try {
  1609             if (node.nextWaiter != null) // clean up if cancelled
  1241                         ForkJoinPool.managedBlock(node);
  1610                 unlinkCancelledWaiters();
  1242                     } catch (InterruptedException ie) {
  1611             if (interruptMode != 0)
  1243                         interrupted = true;
  1612                 reportInterruptAfterWait(interruptMode);
  1244                     }
       
  1245                 } else
       
  1246                     Thread.onSpinWait();    // awoke while enqueuing
       
  1247             }
       
  1248             LockSupport.setCurrentBlocker(null);
       
  1249             node.clearStatus();
       
  1250             acquire(node, savedState, false, false, false, 0L);
       
  1251             if (interrupted) {
       
  1252                 if (cancelled) {
       
  1253                     unlinkCancelledWaiters(node);
       
  1254                     throw new InterruptedException();
       
  1255                 }
       
  1256                 Thread.currentThread().interrupt();
       
  1257             }
  1613         }
  1258         }
  1614 
  1259 
  1615         /**
  1260         /**
  1616          * Implements timed condition wait.
  1261          * Implements timed condition wait.
  1617          * <ol>
  1262          * <ol>
  1627          */
  1272          */
  1628         public final long awaitNanos(long nanosTimeout)
  1273         public final long awaitNanos(long nanosTimeout)
  1629                 throws InterruptedException {
  1274                 throws InterruptedException {
  1630             if (Thread.interrupted())
  1275             if (Thread.interrupted())
  1631                 throw new InterruptedException();
  1276                 throw new InterruptedException();
  1632             // We don't check for nanosTimeout <= 0L here, to allow
  1277             ConditionNode node = new ConditionNode();
  1633             // awaitNanos(0) as a way to "yield the lock".
  1278             long savedState = enableWait(node);
  1634             final long deadline = System.nanoTime() + nanosTimeout;
  1279             long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
  1635             long initialNanos = nanosTimeout;
  1280             long deadline = System.nanoTime() + nanos;
  1636             Node node = addConditionWaiter();
  1281             boolean cancelled = false, interrupted = false;
  1637             long savedState = fullyRelease(node);
  1282             while (!canReacquire(node)) {
  1638             int interruptMode = 0;
  1283                 if ((interrupted |= Thread.interrupted()) ||
  1639             while (!isOnSyncQueue(node)) {
  1284                     (nanos = deadline - System.nanoTime()) <= 0L) {
  1640                 if (nanosTimeout <= 0L) {
  1285                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
  1641                     transferAfterCancelledWait(node);
  1286                         break;
  1642                     break;
  1287                 } else
  1643                 }
  1288                     LockSupport.parkNanos(this, nanos);
  1644                 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
  1289             }
  1645                     LockSupport.parkNanos(this, nanosTimeout);
  1290             node.clearStatus();
  1646                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1291             acquire(node, savedState, false, false, false, 0L);
  1647                     break;
  1292             if (cancelled) {
  1648                 nanosTimeout = deadline - System.nanoTime();
  1293                 unlinkCancelledWaiters(node);
  1649             }
  1294                 if (interrupted)
  1650             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1295                     throw new InterruptedException();
  1651                 interruptMode = REINTERRUPT;
  1296             } else if (interrupted)
  1652             if (node.nextWaiter != null)
  1297                 Thread.currentThread().interrupt();
  1653                 unlinkCancelledWaiters();
       
  1654             if (interruptMode != 0)
       
  1655                 reportInterruptAfterWait(interruptMode);
       
  1656             long remaining = deadline - System.nanoTime(); // avoid overflow
  1298             long remaining = deadline - System.nanoTime(); // avoid overflow
  1657             return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
  1299             return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
  1658         }
  1300         }
  1659 
  1301 
  1660         /**
  1302         /**
  1661          * Implements absolute timed condition wait.
  1303          * Implements absolute timed condition wait.
  1662          * <ol>
  1304          * <ol>
  1674         public final boolean awaitUntil(Date deadline)
  1316         public final boolean awaitUntil(Date deadline)
  1675                 throws InterruptedException {
  1317                 throws InterruptedException {
  1676             long abstime = deadline.getTime();
  1318             long abstime = deadline.getTime();
  1677             if (Thread.interrupted())
  1319             if (Thread.interrupted())
  1678                 throw new InterruptedException();
  1320                 throw new InterruptedException();
  1679             Node node = addConditionWaiter();
  1321             ConditionNode node = new ConditionNode();
  1680             long savedState = fullyRelease(node);
  1322             long savedState = enableWait(node);
  1681             boolean timedout = false;
  1323             boolean cancelled = false, interrupted = false;
  1682             int interruptMode = 0;
  1324             while (!canReacquire(node)) {
  1683             while (!isOnSyncQueue(node)) {
  1325                 if ((interrupted |= Thread.interrupted()) ||
  1684                 if (System.currentTimeMillis() >= abstime) {
  1326                     System.currentTimeMillis() >= abstime) {
  1685                     timedout = transferAfterCancelledWait(node);
  1327                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
  1686                     break;
  1328                         break;
  1687                 }
  1329                 } else
  1688                 LockSupport.parkUntil(this, abstime);
  1330                     LockSupport.parkUntil(this, abstime);
  1689                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1331             }
  1690                     break;
  1332             node.clearStatus();
  1691             }
  1333             acquire(node, savedState, false, false, false, 0L);
  1692             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1334             if (cancelled) {
  1693                 interruptMode = REINTERRUPT;
  1335                 unlinkCancelledWaiters(node);
  1694             if (node.nextWaiter != null)
  1336                 if (interrupted)
  1695                 unlinkCancelledWaiters();
  1337                     throw new InterruptedException();
  1696             if (interruptMode != 0)
  1338             } else if (interrupted)
  1697                 reportInterruptAfterWait(interruptMode);
  1339                 Thread.currentThread().interrupt();
  1698             return !timedout;
  1340             return !cancelled;
  1699         }
  1341         }
  1700 
  1342 
  1701         /**
  1343         /**
  1702          * Implements timed condition wait.
  1344          * Implements timed condition wait.
  1703          * <ol>
  1345          * <ol>
  1715         public final boolean await(long time, TimeUnit unit)
  1357         public final boolean await(long time, TimeUnit unit)
  1716                 throws InterruptedException {
  1358                 throws InterruptedException {
  1717             long nanosTimeout = unit.toNanos(time);
  1359             long nanosTimeout = unit.toNanos(time);
  1718             if (Thread.interrupted())
  1360             if (Thread.interrupted())
  1719                 throw new InterruptedException();
  1361                 throw new InterruptedException();
  1720             // We don't check for nanosTimeout <= 0L here, to allow
  1362             ConditionNode node = new ConditionNode();
  1721             // await(0, unit) as a way to "yield the lock".
  1363             long savedState = enableWait(node);
  1722             final long deadline = System.nanoTime() + nanosTimeout;
  1364             long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
  1723             Node node = addConditionWaiter();
  1365             long deadline = System.nanoTime() + nanos;
  1724             long savedState = fullyRelease(node);
  1366             boolean cancelled = false, interrupted = false;
  1725             boolean timedout = false;
  1367             while (!canReacquire(node)) {
  1726             int interruptMode = 0;
  1368                 if ((interrupted |= Thread.interrupted()) ||
  1727             while (!isOnSyncQueue(node)) {
  1369                     (nanos = deadline - System.nanoTime()) <= 0L) {
  1728                 if (nanosTimeout <= 0L) {
  1370                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
  1729                     timedout = transferAfterCancelledWait(node);
  1371                         break;
  1730                     break;
  1372                 } else
  1731                 }
  1373                     LockSupport.parkNanos(this, nanos);
  1732                 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
  1374             }
  1733                     LockSupport.parkNanos(this, nanosTimeout);
  1375             node.clearStatus();
  1734                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1376             acquire(node, savedState, false, false, false, 0L);
  1735                     break;
  1377             if (cancelled) {
  1736                 nanosTimeout = deadline - System.nanoTime();
  1378                 unlinkCancelledWaiters(node);
  1737             }
  1379                 if (interrupted)
  1738             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1380                     throw new InterruptedException();
  1739                 interruptMode = REINTERRUPT;
  1381             } else if (interrupted)
  1740             if (node.nextWaiter != null)
  1382                 Thread.currentThread().interrupt();
  1741                 unlinkCancelledWaiters();
  1383             return !cancelled;
  1742             if (interruptMode != 0)
       
  1743                 reportInterruptAfterWait(interruptMode);
       
  1744             return !timedout;
       
  1745         }
  1384         }
  1746 
  1385 
  1747         //  support for instrumentation
  1386         //  support for instrumentation
  1748 
  1387 
  1749         /**
  1388         /**
  1765          *         returns {@code false}
  1404          *         returns {@code false}
  1766          */
  1405          */
  1767         protected final boolean hasWaiters() {
  1406         protected final boolean hasWaiters() {
  1768             if (!isHeldExclusively())
  1407             if (!isHeldExclusively())
  1769                 throw new IllegalMonitorStateException();
  1408                 throw new IllegalMonitorStateException();
  1770             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1409             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
  1771                 if (w.waitStatus == Node.CONDITION)
  1410                 if ((w.status & COND) != 0)
  1772                     return true;
  1411                     return true;
  1773             }
  1412             }
  1774             return false;
  1413             return false;
  1775         }
  1414         }
  1776 
  1415 
  1785          */
  1424          */
  1786         protected final int getWaitQueueLength() {
  1425         protected final int getWaitQueueLength() {
  1787             if (!isHeldExclusively())
  1426             if (!isHeldExclusively())
  1788                 throw new IllegalMonitorStateException();
  1427                 throw new IllegalMonitorStateException();
  1789             int n = 0;
  1428             int n = 0;
  1790             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1429             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
  1791                 if (w.waitStatus == Node.CONDITION)
  1430                 if ((w.status & COND) != 0)
  1792                     ++n;
  1431                     ++n;
  1793             }
  1432             }
  1794             return n;
  1433             return n;
  1795         }
  1434         }
  1796 
  1435 
  1805          */
  1444          */
  1806         protected final Collection<Thread> getWaitingThreads() {
  1445         protected final Collection<Thread> getWaitingThreads() {
  1807             if (!isHeldExclusively())
  1446             if (!isHeldExclusively())
  1808                 throw new IllegalMonitorStateException();
  1447                 throw new IllegalMonitorStateException();
  1809             ArrayList<Thread> list = new ArrayList<>();
  1448             ArrayList<Thread> list = new ArrayList<>();
  1810             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1449             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
  1811                 if (w.waitStatus == Node.CONDITION) {
  1450                 if ((w.status & COND) != 0) {
  1812                     Thread t = w.thread;
  1451                     Thread t = w.waiter;
  1813                     if (t != null)
  1452                     if (t != null)
  1814                         list.add(t);
  1453                         list.add(t);
  1815                 }
  1454                 }
  1816             }
  1455             }
  1817             return list;
  1456             return list;
  1818         }
  1457         }
  1819     }
  1458     }
  1820 
  1459 
  1821     // VarHandle mechanics
  1460     // Unsafe
  1822     private static final VarHandle STATE;
  1461     private static final Unsafe U = Unsafe.getUnsafe();
  1823     private static final VarHandle HEAD;
  1462     private static final long STATE
  1824     private static final VarHandle TAIL;
  1463         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state");
       
  1464     private static final long HEAD
       
  1465         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head");
       
  1466     private static final long TAIL
       
  1467         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail");
  1825 
  1468 
  1826     static {
  1469     static {
  1827         try {
       
  1828             MethodHandles.Lookup l = MethodHandles.lookup();
       
  1829             STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
       
  1830             HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
       
  1831             TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
       
  1832         } catch (ReflectiveOperationException e) {
       
  1833             throw new ExceptionInInitializerError(e);
       
  1834         }
       
  1835 
       
  1836         // Reduce the risk of rare disastrous classloading in first call to
       
  1837         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
       
  1838         Class<?> ensureLoaded = LockSupport.class;
  1470         Class<?> ensureLoaded = LockSupport.class;
  1839     }
  1471     }
  1840 
       
  1841     /**
       
  1842      * Initializes head and tail fields on first contention.
       
  1843      */
       
  1844     private final void initializeSyncQueue() {
       
  1845         Node h;
       
  1846         if (HEAD.compareAndSet(this, null, (h = new Node())))
       
  1847             tail = h;
       
  1848     }
       
  1849 
       
  1850     /**
       
  1851      * CASes tail field.
       
  1852      */
       
  1853     private final boolean compareAndSetTail(Node expect, Node update) {
       
  1854         return TAIL.compareAndSet(this, expect, update);
       
  1855     }
       
  1856 }
  1472 }