--- a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java Thu Oct 17 20:27:44 2019 +0100
+++ b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java Thu Oct 17 20:53:35 2019 +0100
@@ -35,12 +35,12 @@
package java.util.concurrent.locks;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ForkJoinPool;
+import jdk.internal.misc.Unsafe;
/**
* Provides a framework for implementing blocking locks and related
@@ -312,265 +312,208 @@
*/
protected AbstractQueuedSynchronizer() { }
- /**
- * Wait queue node class.
+ /*
+ * Overview.
*
- * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
+ * The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
- * spinlocks. We instead use them for blocking synchronizers, but
- * use the same basic tactic of holding some of the control
- * information about a thread in the predecessor of its node. A
- * "status" field in each node keeps track of whether a thread
- * should block. A node is signalled when its predecessor
- * releases. Each node of the queue otherwise serves as a
- * specific-notification-style monitor holding a single waiting
- * thread. The status field does NOT control whether threads are
- * granted locks etc though. A thread may try to acquire if it is
- * first in the queue. But being first does not guarantee success;
- * it only gives the right to contend. So the currently released
- * contender thread may need to rewait.
+ * spinlocks. We instead use them for blocking synchronizers by
+ * including explicit ("prev" and "next") links plus a "status"
+ * field that allow nodes to signal successors when releasing
+ * locks, and handle cancellation due to interrupts and timeouts.
+ * The status field includes bits that track whether a thread
+ * needs a signal (using LockSupport.unpark). Despite these
+ * additions, we maintain most CLH locality properties.
+ *
+ * To enqueue into a CLH lock, you atomically splice it in as new
+ * tail. To dequeue, you set the head field, so the next eligible
+ * waiter becomes first.
*
- * <p>To enqueue into a CLH lock, you atomically splice it in as new
- * tail. To dequeue, you just set the head field.
- * <pre>
- * +------+ prev +-----+ +-----+
- * head | | <---- | | <---- | | tail
- * +------+ +-----+ +-----+
- * </pre>
+ * +------+ prev +-------+ +------+
+ * | head | <---- | first | <---- | tail |
+ * +------+ +-------+ +------+
+ *
+ * Insertion into a CLH queue requires only a single atomic
+ * operation on "tail", so there is a simple point of demarcation
+ * from unqueued to queued. The "next" link of the predecessor is
+ * set by the enqueuing thread after successful CAS. Even though
+ * non-atomic, this suffices to ensure that any blocked thread is
+ * signalled by a predecessor when eligible (although in the case
+ * of cancellation, possibly with the assistance of a signal in
+ * method cleanQueue). Signalling is based in part on a
+ * Dekker-like scheme in which the to-be waiting thread indicates
+ * WAITING status, then retries acquiring, and then rechecks
+ * status before blocking. The signaller atomically clears WAITING
+ * status when unparking.
*
- * <p>Insertion into a CLH queue requires only a single atomic
- * operation on "tail", so there is a simple atomic point of
- * demarcation from unqueued to queued. Similarly, dequeuing
- * involves only updating the "head". However, it takes a bit
- * more work for nodes to determine who their successors are,
- * in part to deal with possible cancellation due to timeouts
- * and interrupts.
- *
- * <p>The "prev" links (not used in original CLH locks), are mainly
- * needed to handle cancellation. If a node is cancelled, its
- * successor is (normally) relinked to a non-cancelled
- * predecessor. For explanation of similar mechanics in the case
- * of spin locks, see the papers by Scott and Scherer at
- * http://www.cs.rochester.edu/u/scott/synchronization/
+ * Dequeuing on acquire involves detaching (nulling) a node's
+ * "prev" node and then updating the "head". Other threads check
+ * if a node is or was dequeued by checking "prev" rather than
+ * head. We enforce the nulling then setting order by spin-waiting
+ * if necessary. Because of this, the lock algorithm is not itself
+ * strictly "lock-free" because an acquiring thread may need to
+ * wait for a previous acquire to make progress. When used with
+ * exclusive locks, such progress is required anyway. However
+ * Shared mode may (uncommonly) require a spin-wait before
+ * setting head field to ensure proper propagation. (Historical
+ * note: This allows some simplifications and efficiencies
+ * compared to previous versions of this class.)
*
- * <p>We also use "next" links to implement blocking mechanics.
- * The thread id for each node is kept in its own node, so a
- * predecessor signals the next node to wake up by traversing
- * next link to determine which thread it is. Determination of
- * successor must avoid races with newly queued nodes to set
- * the "next" fields of their predecessors. This is solved
- * when necessary by checking backwards from the atomically
- * updated "tail" when a node's successor appears to be null.
- * (Or, said differently, the next-links are an optimization
- * so that we don't usually need a backward scan.)
+ * A node's predecessor can change due to cancellation while it is
+ * waiting, until the node is first in queue, at which point it
+ * cannot change. The acquire methods cope with this by rechecking
+ * "prev" before waiting. The prev and next fields are modified
+ * only via CAS by cancelled nodes in method cleanQueue. The
+ * unsplice strategy is reminiscent of Michael-Scott queues in
+ * that after a successful CAS to prev field, other threads help
+ * fix next fields. Because cancellation often occurs in bunches
+ * that complicate decisions about necessary signals, each call to
+ * cleanQueue traverses the queue until a clean sweep. Nodes that
+ * become relinked as first are unconditionally unparked
+ * (sometimes unnecessarily, but those cases are not worth
+ * avoiding).
*
- * <p>Cancellation introduces some conservatism to the basic
- * algorithms. Since we must poll for cancellation of other
- * nodes, we can miss noticing whether a cancelled node is
- * ahead or behind us. This is dealt with by always unparking
- * successors upon cancellation, allowing them to stabilize on
- * a new predecessor, unless we can identify an uncancelled
- * predecessor who will carry this responsibility.
+ * A thread may try to acquire if it is first (frontmost) in the
+ * queue, and sometimes before. Being first does not guarantee
+ * success; it only gives the right to contend. We balance
+ * throughput, overhead, and fairness by allowing incoming threads
+ * to "barge" and acquire the synchronizer while in the process of
+ * enqueuing, in which case an awakened first thread may need to
+ * rewait. To counteract possible repeated unlucky rewaits, we
+ * exponentially increase retries (up to 256) to acquire each time
+ * a thread is unparked. Except in this case, AQS locks do not
+ * spin; they instead interleave attempts to acquire with
+ * bookkeeping steps. (Users who want spinlocks can use
+ * tryAcquire.)
*
- * <p>CLH queues need a dummy header node to get started. But
+ * To improve garbage collectibility, fields of nodes not yet on
+ * list are null. (It is not rare to create and then throw away a
+ * node without using it.) Fields of nodes coming off the list are
+ * nulled out as soon as possible. This accentuates the challenge
+ * of externally determining the first waiting thread (as in
+ * method getFirstQueuedThread). This sometimes requires the
+ * fallback of traversing backwards from the atomically updated
+ * "tail" when fields appear null. (This is never needed in the
+ * process of signalling though.)
+ *
+ * CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
- * <p>Threads waiting on Conditions use the same nodes, but
- * use an additional link. Conditions only need to link nodes
- * in simple (non-concurrent) linked queues because they are
- * only accessed when exclusively held. Upon await, a node is
- * inserted into a condition queue. Upon signal, the node is
- * transferred to the main queue. A special value of status
- * field is used to mark which queue a node is on.
+ * Shared mode operations differ from Exclusive in that an acquire
+ * signals the next waiter to try to acquire if it is also
+ * Shared. The tryAcquireShared API allows users to indicate the
+ * degree of propagation, but in most applications, it is more
+ * efficient to ignore this, allowing the successor to try
+ * acquiring in any case.
+ *
+ * Threads waiting on Conditions use nodes with an additional
+ * link to maintain the (FIFO) list of conditions. Conditions only
+ * need to link nodes in simple (non-concurrent) linked queues
+ * because they are only accessed when exclusively held. Upon
+ * await, a node is inserted into a condition queue. Upon signal,
+ * the node is enqueued on the main queue. A special status field
+ * value is used to track and atomically trigger this.
*
- * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
+ * Accesses to fields head, tail, and state use full Volatile
+ * mode, along with CAS. Node fields status, prev and next also do
+ * so while threads may be signallable, but sometimes use weaker
+ * modes otherwise. Accesses to field "waiter" (the thread to be
+ * signalled) are always sandwiched between other atomic accesses
+ * so are used in Plain mode. We use jdk.internal Unsafe versions
+ * of atomic access methods rather than VarHandles to avoid
+ * potential VM bootstrap issues.
+ *
+ * Most of the above is performed by primary internal method
+ * acquire, that is invoked in some way by all exported acquire
+ * methods. (It is usually easy for compilers to optimize
+ * call-site specializations when heavily used.)
+ *
+ * There are several arbitrary decisions about when and how to
+ * check interrupts in both acquire and await before and/or after
+ * blocking. The decisions are less arbitrary in implementation
+ * updates because some users appear to rely on original behaviors
+ * in ways that are racy and so (rarely) wrong in general but hard
+ * to justify changing.
+ *
+ * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
*/
- static final class Node {
- /** Marker to indicate a node is waiting in shared mode */
- static final Node SHARED = new Node();
- /** Marker to indicate a node is waiting in exclusive mode */
- static final Node EXCLUSIVE = null;
+
+ // Node status bits, also used as argument and return values
+ static final int WAITING = 1; // must be 1
+ static final int CANCELLED = 0x80000000; // must be negative
+ static final int COND = 2; // in a condition wait
- /** waitStatus value to indicate thread has cancelled. */
- static final int CANCELLED = 1;
- /** waitStatus value to indicate successor's thread needs unparking. */
- static final int SIGNAL = -1;
- /** waitStatus value to indicate thread is waiting on condition. */
- static final int CONDITION = -2;
- /**
- * waitStatus value to indicate the next acquireShared should
- * unconditionally propagate.
- */
- static final int PROPAGATE = -3;
+ /** CLH Nodes */
+ abstract static class Node {
+ volatile Node prev; // initially attached via casTail
+ volatile Node next; // visibly nonnull when signallable
+ Thread waiter; // visibly nonnull when enqueued
+ volatile int status; // written by owner, atomic bit ops by others
- /**
- * Status field, taking on only the values:
- * SIGNAL: The successor of this node is (or will soon be)
- * blocked (via park), so the current node must
- * unpark its successor when it releases or
- * cancels. To avoid races, acquire methods must
- * first indicate they need a signal,
- * then retry the atomic acquire, and then,
- * on failure, block.
- * CANCELLED: This node is cancelled due to timeout or interrupt.
- * Nodes never leave this state. In particular,
- * a thread with cancelled node never again blocks.
- * CONDITION: This node is currently on a condition queue.
- * It will not be used as a sync queue node
- * until transferred, at which time the status
- * will be set to 0. (Use of this value here has
- * nothing to do with the other uses of the
- * field, but simplifies mechanics.)
- * PROPAGATE: A releaseShared should be propagated to other
- * nodes. This is set (for head node only) in
- * doReleaseShared to ensure propagation
- * continues, even if other operations have
- * since intervened.
- * 0: None of the above
- *
- * The values are arranged numerically to simplify use.
- * Non-negative values mean that a node doesn't need to
- * signal. So, most code doesn't need to check for particular
- * values, just for sign.
- *
- * The field is initialized to 0 for normal sync nodes, and
- * CONDITION for condition nodes. It is modified using CAS
- * (or when possible, unconditional volatile writes).
- */
- volatile int waitStatus;
+ // methods for atomic operations
+ final boolean casPrev(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, PREV, c, v);
+ }
+ final boolean casNext(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, NEXT, c, v);
+ }
+ final int getAndUnsetStatus(int v) { // for signalling
+ return U.getAndBitwiseAndInt(this, STATUS, ~v);
+ }
+ final void setPrevRelaxed(Node p) { // for off-queue assignment
+ U.putReference(this, PREV, p);
+ }
+ final void setStatusRelaxed(int s) { // for off-queue assignment
+ U.putInt(this, STATUS, s);
+ }
+ final void clearStatus() { // for reducing unneeded signals
+ U.putIntOpaque(this, STATUS, 0);
+ }
- /**
- * Link to predecessor node that current node/thread relies on
- * for checking waitStatus. Assigned during enqueuing, and nulled
- * out (for sake of GC) only upon dequeuing. Also, upon
- * cancellation of a predecessor, we short-circuit while
- * finding a non-cancelled one, which will always exist
- * because the head node is never cancelled: A node becomes
- * head only as a result of successful acquire. A
- * cancelled thread never succeeds in acquiring, and a thread only
- * cancels itself, not any other node.
- */
- volatile Node prev;
+ private static final long STATUS
+ = U.objectFieldOffset(Node.class, "status");
+ private static final long NEXT
+ = U.objectFieldOffset(Node.class, "next");
+ private static final long PREV
+ = U.objectFieldOffset(Node.class, "prev");
+ }
- /**
- * Link to the successor node that the current node/thread
- * unparks upon release. Assigned during enqueuing, adjusted
- * when bypassing cancelled predecessors, and nulled out (for
- * sake of GC) when dequeued. The enq operation does not
- * assign next field of a predecessor until after attachment,
- * so seeing a null next field does not necessarily mean that
- * node is at end of queue. However, if a next field appears
- * to be null, we can scan prev's from the tail to
- * double-check. The next field of cancelled nodes is set to
- * point to the node itself instead of null, to make life
- * easier for isOnSyncQueue.
- */
- volatile Node next;
+ // Concrete classes tagged by type
+ static final class ExclusiveNode extends Node { }
+ static final class SharedNode extends Node { }
+
+ static final class ConditionNode extends Node
+ implements ForkJoinPool.ManagedBlocker {
+ ConditionNode nextWaiter; // link to next waiting node
/**
- * The thread that enqueued this node. Initialized on
- * construction and nulled out after use.
- */
- volatile Thread thread;
-
- /**
- * Link to next node waiting on condition, or the special
- * value SHARED. Because condition queues are accessed only
- * when holding in exclusive mode, we just need a simple
- * linked queue to hold nodes while they are waiting on
- * conditions. They are then transferred to the queue to
- * re-acquire. And because conditions can only be exclusive,
- * we save a field by using special value to indicate shared
- * mode.
+ * Allows Conditions to be used in ForkJoinPools without
+ * risking fixed pool exhaustion. This is usable only for
+ * untimed Condition waits, not timed versions.
*/
- Node nextWaiter;
-
- /**
- * Returns true if node is waiting in shared mode.
- */
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
-
- /**
- * Returns previous node, or throws NullPointerException if null.
- * Use when predecessor cannot be null. The null check could
- * be elided, but is present to help the VM.
- *
- * @return the predecessor of this node
- */
- final Node predecessor() {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
+ public final boolean isReleasable() {
+ return status <= 1 || Thread.currentThread().isInterrupted();
}
- /** Establishes initial head or SHARED marker. */
- Node() {}
-
- /** Constructor used by addWaiter. */
- Node(Node nextWaiter) {
- this.nextWaiter = nextWaiter;
- THREAD.set(this, Thread.currentThread());
- }
-
- /** Constructor used by addConditionWaiter. */
- Node(int waitStatus) {
- WAITSTATUS.set(this, waitStatus);
- THREAD.set(this, Thread.currentThread());
- }
-
- /** CASes waitStatus field. */
- final boolean compareAndSetWaitStatus(int expect, int update) {
- return WAITSTATUS.compareAndSet(this, expect, update);
- }
-
- /** CASes next field. */
- final boolean compareAndSetNext(Node expect, Node update) {
- return NEXT.compareAndSet(this, expect, update);
- }
-
- final void setPrevRelaxed(Node p) {
- PREV.set(this, p);
- }
-
- // VarHandle mechanics
- private static final VarHandle NEXT;
- private static final VarHandle PREV;
- private static final VarHandle THREAD;
- private static final VarHandle WAITSTATUS;
- static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- NEXT = l.findVarHandle(Node.class, "next", Node.class);
- PREV = l.findVarHandle(Node.class, "prev", Node.class);
- THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
- WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
}
}
/**
- * Head of the wait queue, lazily initialized. Except for
- * initialization, it is modified only via method setHead. Note:
- * If head exists, its waitStatus is guaranteed not to be
- * CANCELLED.
+ * Head of the wait queue, lazily initialized.
*/
private transient volatile Node head;
/**
- * Tail of the wait queue, lazily initialized. Modified only via
- * method enq to add new wait node.
+ * Tail of the wait queue. After initialization, modified only via casTail.
*/
private transient volatile Node tail;
@@ -609,481 +552,235 @@
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
- return STATE.compareAndSet(this, expect, update);
+ return U.compareAndSetInt(this, STATE, expect, update);
}
// Queuing utilities
- /**
- * The number of nanoseconds for which it is faster to spin
- * rather than to use timed park. A rough estimate suffices
- * to improve responsiveness with very short timeouts.
- */
- static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
+ private boolean casTail(Node c, Node v) {
+ return U.compareAndSetReference(this, TAIL, c, v);
+ }
+
+ /** tries once to CAS a new dummy node for head */
+ private void tryInitializeHead() {
+ Node h = new ExclusiveNode();
+ if (U.compareAndSetReference(this, HEAD, null, h))
+ tail = h;
+ }
/**
- * Inserts node into queue, initializing if necessary. See picture above.
- * @param node the node to insert
- * @return node's predecessor
+ * Enqueues the node unless null. (Currently used only for
+ * ConditionNodes; other cases are interleaved with acquires.)
*/
- private Node enq(Node node) {
- for (;;) {
- Node oldTail = tail;
- if (oldTail != null) {
- node.setPrevRelaxed(oldTail);
- if (compareAndSetTail(oldTail, node)) {
- oldTail.next = node;
- return oldTail;
+ final void enqueue(Node node) {
+ if (node != null) {
+ for (;;) {
+ Node t = tail;
+ node.setPrevRelaxed(t); // avoid unnecessary fence
+ if (t == null) // initialize
+ tryInitializeHead();
+ else if (casTail(t, node)) {
+ t.next = node;
+ if (t.status < 0) // wake up to clean link
+ LockSupport.unpark(node.waiter);
+ break;
}
- } else {
- initializeSyncQueue();
}
}
}
+ /** Returns true if node is found in traversal from tail */
+ final boolean isEnqueued(Node node) {
+ for (Node t = tail; t != null; t = t.prev)
+ if (t == node)
+ return true;
+ return false;
+ }
+
/**
- * Creates and enqueues node for current thread and given mode.
+ * Wakes up the successor of given node, if one exists, and unsets its
+ * WAITING status to avoid park race. This may fail to wake up an
+ * eligible thread when one or more have been cancelled, but
+ * cancelAcquire ensures liveness.
+ */
+ private static void signalNext(Node h) {
+ Node s;
+ if (h != null && (s = h.next) != null && s.status != 0) {
+ s.getAndUnsetStatus(WAITING);
+ LockSupport.unpark(s.waiter);
+ }
+ }
+
+ /** Wakes up the given node if in shared mode */
+ private static void signalNextIfShared(Node h) {
+ Node s;
+ if (h != null && (s = h.next) != null &&
+ (s instanceof SharedNode) && s.status != 0) {
+ s.getAndUnsetStatus(WAITING);
+ LockSupport.unpark(s.waiter);
+ }
+ }
+
+ /**
+ * Main acquire method, invoked by all exported acquire methods.
*
- * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
- * @return the new node
+ * @param node null unless a reacquiring Condition
+ * @param arg the acquire argument
+ * @param shared true if shared mode else exclusive
+ * @param interruptible if abort and return negative on interrupt
+ * @param timed if true use timed waits
+ * @param time if timed, the System.nanoTime value to timeout
+ * @return positive if acquired, 0 if timed out, negative if interrupted
*/
- private Node addWaiter(Node mode) {
- Node node = new Node(mode);
+ final int acquire(Node node, int arg, boolean shared,
+ boolean interruptible, boolean timed, long time) {
+ Thread current = Thread.currentThread();
+ byte spins = 0, postSpins = 0; // retries upon unpark of first thread
+ boolean interrupted = false, first = false;
+ Node pred = null; // predecessor of node when enqueued
+
+ /*
+ * Repeatedly:
+ * Check if node now first
+ * if so, ensure head stable, else ensure valid predecessor
+ * if node is first or not yet enqueued, try acquiring
+ * else if node not yet created, create it
+ * else if not yet enqueued, try once to enqueue
+ * else if woken from park, retry (up to postSpins times)
+ * else if WAITING status not set, set and retry
+ * else park and clear WAITING status, and check cancellation
+ */
for (;;) {
- Node oldTail = tail;
- if (oldTail != null) {
- node.setPrevRelaxed(oldTail);
- if (compareAndSetTail(oldTail, node)) {
- oldTail.next = node;
- return node;
+ if (!first && (pred = (node == null) ? null : node.prev) != null &&
+ !(first = (head == pred))) {
+ if (pred.status < 0) {
+ cleanQueue(); // predecessor cancelled
+ continue;
+ } else if (pred.prev == null) {
+ Thread.onSpinWait(); // ensure serialization
+ continue;
+ }
+ }
+ if (first || pred == null) {
+ boolean acquired;
+ try {
+ if (shared)
+ acquired = (tryAcquireShared(arg) >= 0);
+ else
+ acquired = tryAcquire(arg);
+ } catch (Throwable ex) {
+ cancelAcquire(node, interrupted, false);
+ throw ex;
+ }
+ if (acquired) {
+ if (first) {
+ node.prev = null;
+ head = node;
+ pred.next = null;
+ node.waiter = null;
+ if (shared)
+ signalNextIfShared(node);
+ if (interrupted)
+ current.interrupt();
+ }
+ return 1;
}
+ }
+ if (node == null) { // allocate; retry before enqueue
+ if (shared)
+ node = new SharedNode();
+ else
+ node = new ExclusiveNode();
+ } else if (pred == null) { // try to enqueue
+ node.waiter = current;
+ Node t = tail;
+ node.setPrevRelaxed(t); // avoid unnecessary fence
+ if (t == null)
+ tryInitializeHead();
+ else if (!casTail(t, node))
+ node.setPrevRelaxed(null); // back out
+ else
+ t.next = node;
+ } else if (first && spins != 0) {
+ --spins; // reduce unfairness on rewaits
+ Thread.onSpinWait();
+ } else if (node.status == 0) {
+ node.status = WAITING; // enable signal and recheck
} else {
- initializeSyncQueue();
+ long nanos;
+ spins = postSpins = (byte)((postSpins << 1) | 1);
+ if (!timed)
+ LockSupport.park(this);
+ else if ((nanos = time - System.nanoTime()) > 0L)
+ LockSupport.parkNanos(this, nanos);
+ else
+ break;
+ node.clearStatus();
+ if ((interrupted |= Thread.interrupted()) && interruptible)
+ break;
+ }
+ }
+ return cancelAcquire(node, interrupted, interruptible);
+ }
+
+ /**
+ * Possibly repeatedly traverses from tail, unsplicing cancelled
+ * nodes until none are found. Unparks nodes that may have been
+ * relinked to be next eligible acquirer.
+ */
+ private void cleanQueue() {
+ for (;;) { // restart point
+ for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
+ if (q == null || (p = q.prev) == null)
+ return; // end of list
+ if (s == null ? tail != q : (s.prev != q || s.status < 0))
+ break; // inconsistent
+ if (q.status < 0) { // cancelled
+ if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
+ q.prev == p) {
+ p.casNext(q, s); // OK if fails
+ if (p.prev == null)
+ signalNext(p);
+ }
+ break;
+ }
+ if ((n = p.next) != q) { // help finish
+ if (n != null && q.prev == p) {
+ p.casNext(n, q);
+ if (p.prev == null)
+ signalNext(p);
+ }
+ break;
+ }
+ s = q;
+ q = q.prev;
}
}
}
/**
- * Sets head of queue to be node, thus dequeuing. Called only by
- * acquire methods. Also nulls out unused fields for sake of GC
- * and to suppress unnecessary signals and traversals.
- *
- * @param node the node
- */
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
- }
-
- /**
- * Wakes up node's successor, if one exists.
- *
- * @param node the node
- */
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- node.compareAndSetWaitStatus(ws, 0);
-
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node p = tail; p != node && p != null; p = p.prev)
- if (p.waitStatus <= 0)
- s = p;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
-
- /**
- * Release action for shared mode -- signals successor and ensures
- * propagation. (Note: For exclusive mode, release just amounts
- * to calling unparkSuccessor of head if it needs signal.)
- */
- private void doReleaseShared() {
- /*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
- */
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
-
- /**
- * Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if either propagate > 0 or
- * PROPAGATE status was set.
- *
- * @param node the node
- * @param propagate the return value from a tryAcquireShared
- */
- private void setHeadAndPropagate(Node node, int propagate) {
- Node h = head; // Record old head for check below
- setHead(node);
- /*
- * Try to signal next queued node if:
- * Propagation was indicated by caller,
- * or was recorded (as h.waitStatus either before
- * or after setHead) by a previous operation
- * (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.)
- * and
- * The next node is waiting in shared mode,
- * or we don't know, because it appears null
- *
- * The conservatism in both of these checks may cause
- * unnecessary wake-ups, but only when there are multiple
- * racing acquires/releases, so most need signals now or soon
- * anyway.
- */
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
- }
- }
-
- // Utilities for various versions of acquire
-
- /**
* Cancels an ongoing attempt to acquire.
*
- * @param node the node
- */
- private void cancelAcquire(Node node) {
- // Ignore if node doesn't exist
- if (node == null)
- return;
-
- node.thread = null;
-
- // Skip cancelled predecessors
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
-
- // predNext is the apparent node to unsplice. CASes below will
- // fail if not, in which case, we lost race vs another cancel
- // or signal, so no further action is necessary, although with
- // a possibility that a cancelled node may transiently remain
- // reachable.
- Node predNext = pred.next;
-
- // Can use unconditional write instead of CAS here.
- // After this atomic step, other Nodes can skip past us.
- // Before, we are free of interference from other threads.
- node.waitStatus = Node.CANCELLED;
-
- // If we are the tail, remove ourselves.
- if (node == tail && compareAndSetTail(node, pred)) {
- pred.compareAndSetNext(predNext, null);
- } else {
- // If successor needs signal, try to set pred's next-link
- // so it will get one. Otherwise wake it up to propagate.
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- pred.compareAndSetNext(predNext, next);
- } else {
- unparkSuccessor(node);
- }
-
- node.next = node; // help GC
- }
- }
-
- /**
- * Checks and updates status for a node that failed to acquire.
- * Returns true if thread should block. This is the main signal
- * control in all acquire loops. Requires that pred == node.prev.
- *
- * @param pred node's predecessor holding status
- * @param node the node
- * @return {@code true} if thread should block
- */
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
- }
- return false;
- }
-
- /**
- * Convenience method to interrupt current thread.
- */
- static void selfInterrupt() {
- Thread.currentThread().interrupt();
- }
-
- /**
- * Convenience method to park and then check if interrupted.
- *
- * @return {@code true} if interrupted
- */
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
-
- /*
- * Various flavors of acquire, varying in exclusive/shared and
- * control modes. Each is mostly the same, but annoyingly
- * different. Only a little bit of factoring is possible due to
- * interactions of exception mechanics (including ensuring that we
- * cancel if tryAcquire throws exception) and other control, at
- * least not without hurting performance too much.
- */
-
- /**
- * Acquires in exclusive uninterruptible mode for thread already in
- * queue. Used by condition wait methods as well as acquire.
- *
- * @param node the node
- * @param arg the acquire argument
- * @return {@code true} if interrupted while waiting
- */
- final boolean acquireQueued(final Node node, int arg) {
- boolean interrupted = false;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node))
- interrupted |= parkAndCheckInterrupt();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- if (interrupted)
- selfInterrupt();
- throw t;
- }
- }
-
- /**
- * Acquires in exclusive interruptible mode.
- * @param arg the acquire argument
+ * @param node the node (may be null if cancelled before enqueuing)
+ * @param interrupted true if thread interrupted
+ * @param interruptible if should report interruption vs reset
*/
- private void doAcquireInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.EXCLUSIVE);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
- }
- }
-
- /**
- * Acquires in exclusive timed mode.
- *
- * @param arg the acquire argument
- * @param nanosTimeout max wait time
- * @return {@code true} if acquired
- */
- private boolean doAcquireNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout;
- final Node node = addWaiter(Node.EXCLUSIVE);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return true;
- }
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) {
- cancelAcquire(node);
- return false;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
+ private int cancelAcquire(Node node, boolean interrupted,
+ boolean interruptible) {
+ if (node != null) {
+ node.waiter = null;
+ node.status = CANCELLED;
+ if (node.prev != null)
+ cleanQueue();
}
- }
-
- /**
- * Acquires in shared uninterruptible mode.
- * @param arg the acquire argument
- */
- private void doAcquireShared(int arg) {
- final Node node = addWaiter(Node.SHARED);
- boolean interrupted = false;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node))
- interrupted |= parkAndCheckInterrupt();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
- } finally {
- if (interrupted)
- selfInterrupt();
+ if (interrupted) {
+ if (interruptible)
+ return CANCELLED;
+ else
+ Thread.currentThread().interrupt();
}
- }
-
- /**
- * Acquires in shared interruptible mode.
- * @param arg the acquire argument
- */
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
- }
- }
-
- /**
- * Acquires in shared timed mode.
- *
- * @param arg the acquire argument
- * @param nanosTimeout max wait time
- * @return {@code true} if acquired
- */
- private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout;
- final Node node = addWaiter(Node.SHARED);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return true;
- }
- }
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) {
- cancelAcquire(node);
- return false;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
- }
+ return 0;
}
// Main exported methods
@@ -1236,9 +933,8 @@
* can represent anything you like.
*/
public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
+ if (!tryAcquire(arg))
+ acquire(null, arg, false, false, false, 0L);
}
/**
@@ -1256,11 +952,10 @@
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
+ throws InterruptedException {
+ if (Thread.interrupted() ||
+ (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
- if (!tryAcquire(arg))
- doAcquireInterruptibly(arg);
}
/**
@@ -1281,11 +976,20 @@
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquire(arg) ||
- doAcquireNanos(arg, nanosTimeout);
+ throws InterruptedException {
+ if (!Thread.interrupted()) {
+ if (tryAcquire(arg))
+ return true;
+ if (nanosTimeout <= 0L)
+ return false;
+ int stat = acquire(null, arg, false, true, true,
+ System.nanoTime() + nanosTimeout);
+ if (stat > 0)
+ return true;
+ if (stat == 0)
+ return false;
+ }
+ throw new InterruptedException();
}
/**
@@ -1300,9 +1004,7 @@
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ signalNext(head);
return true;
}
return false;
@@ -1321,7 +1023,7 @@
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
+ acquire(null, arg, true, false, false, 0L);
}
/**
@@ -1338,11 +1040,11 @@
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
+ throws InterruptedException {
+ if (Thread.interrupted() ||
+ (tryAcquireShared(arg) < 0 &&
+ acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
}
/**
@@ -1363,10 +1065,19 @@
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquireShared(arg) >= 0 ||
- doAcquireSharedNanos(arg, nanosTimeout);
+ if (!Thread.interrupted()) {
+ if (tryAcquireShared(arg) >= 0)
+ return true;
+ if (nanosTimeout <= 0L)
+ return false;
+ int stat = acquire(null, arg, true, true, true,
+ System.nanoTime() + nanosTimeout);
+ if (stat > 0)
+ return true;
+ if (stat == 0)
+ return false;
+ }
+ throw new InterruptedException();
}
/**
@@ -1380,7 +1091,7 @@
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
- doReleaseShared();
+ signalNext(head);
return true;
}
return false;
@@ -1398,7 +1109,7 @@
*/
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
- if (p.waitStatus <= 0)
+ if (p.status >= 0)
return true;
return false;
}
@@ -1428,45 +1139,16 @@
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
- // handle only fast path, else relay
- return (head == tail) ? null : fullGetFirstQueuedThread();
- }
-
- /**
- * Version of getFirstQueuedThread called when fastpath fails.
- */
- private Thread fullGetFirstQueuedThread() {
- /*
- * The first node is normally head.next. Try to get its
- * thread field, ensuring consistent reads: If thread
- * field is nulled out or s.prev is no longer head, then
- * some other thread(s) concurrently performed setHead in
- * between some of our reads. We try this twice before
- * resorting to traversal.
- */
- Node h, s;
- Thread st;
- if (((h = head) != null && (s = h.next) != null &&
- s.prev == head && (st = s.thread) != null) ||
- ((h = head) != null && (s = h.next) != null &&
- s.prev == head && (st = s.thread) != null))
- return st;
-
- /*
- * Head's next field might not have been set yet, or may have
- * been unset after setHead. So we must check to see if tail
- * is actually first node. If not, we continue on, safely
- * traversing from tail back to head to find first,
- * guaranteeing termination.
- */
-
- Thread firstThread = null;
- for (Node p = tail; p != null && p != head; p = p.prev) {
- Thread t = p.thread;
- if (t != null)
- firstThread = t;
+ Thread first = null, w; Node h, s;
+ if ((h = head) != null && ((s = h.next) == null ||
+ (first = s.waiter) == null ||
+ s.prev == null)) {
+ // traverse from tail on stale reads
+ for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
+ if ((w = p.waiter) != null)
+ first = w;
}
- return firstThread;
+ return first;
}
/**
@@ -1483,7 +1165,7 @@
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
- if (p.thread == thread)
+ if (p.waiter == thread)
return true;
return false;
}
@@ -1499,10 +1181,8 @@
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
- return (h = head) != null &&
- (s = h.next) != null &&
- !s.isShared() &&
- s.thread != null;
+ return (h = head) != null && (s = h.next) != null &&
+ !(s instanceof SharedNode) && s.waiter != null;
}
/**
@@ -1549,19 +1229,12 @@
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
- Node h, s;
- if ((h = head) != null) {
- if ((s = h.next) == null || s.waitStatus > 0) {
- s = null; // traverse in case of concurrent cancellation
- for (Node p = tail; p != h && p != null; p = p.prev) {
- if (p.waitStatus <= 0)
- s = p;
- }
- }
- if (s != null && s.thread != Thread.currentThread())
- return true;
- }
- return false;
+ Thread first = null; Node h, s;
+ if ((h = head) != null && ((s = h.next) == null ||
+ (first = s.waiter) == null ||
+ s.prev == null))
+ first = getFirstQueuedThread(); // retry via getFirstQueuedThread
+ return first != null && first != Thread.currentThread();
}
// Instrumentation and monitoring methods
@@ -1578,7 +1251,7 @@
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
- if (p.thread != null)
+ if (p.waiter != null)
++n;
}
return n;
@@ -1598,7 +1271,7 @@
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- Thread t = p.thread;
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
@@ -1616,8 +1289,8 @@
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- if (!p.isShared()) {
- Thread t = p.thread;
+ if (!(p instanceof SharedNode)) {
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
@@ -1636,8 +1309,8 @@
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- if (p.isShared()) {
- Thread t = p.thread;
+ if (p instanceof SharedNode) {
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
@@ -1660,117 +1333,6 @@
+ (hasQueuedThreads() ? "non" : "") + "empty queue]";
}
-
- // Internal support methods for Conditions
-
- /**
- * Returns true if a node, always one that was initially placed on
- * a condition queue, is now waiting to reacquire on sync queue.
- * @param node the node
- * @return true if is reacquiring
- */
- final boolean isOnSyncQueue(Node node) {
- if (node.waitStatus == Node.CONDITION || node.prev == null)
- return false;
- if (node.next != null) // If has successor, it must be on queue
- return true;
- /*
- * node.prev can be non-null, but not yet on queue because
- * the CAS to place it on queue can fail. So we have to
- * traverse from tail to make sure it actually made it. It
- * will always be near the tail in calls to this method, and
- * unless the CAS failed (which is unlikely), it will be
- * there, so we hardly ever traverse much.
- */
- return findNodeFromTail(node);
- }
-
- /**
- * Returns true if node is on sync queue by searching backwards from tail.
- * Called only when needed by isOnSyncQueue.
- * @return true if present
- */
- private boolean findNodeFromTail(Node node) {
- // We check for node first, since it's likely to be at or near tail.
- // tail is known to be non-null, so we could re-order to "save"
- // one null check, but we leave it this way to help the VM.
- for (Node p = tail;;) {
- if (p == node)
- return true;
- if (p == null)
- return false;
- p = p.prev;
- }
- }
-
- /**
- * Transfers a node from a condition queue onto sync queue.
- * Returns true if successful.
- * @param node the node
- * @return true if successfully transferred (else the node was
- * cancelled before signal)
- */
- final boolean transferForSignal(Node node) {
- /*
- * If cannot change waitStatus, the node has been cancelled.
- */
- if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
- return false;
-
- /*
- * Splice onto queue and try to set waitStatus of predecessor to
- * indicate that thread is (probably) waiting. If cancelled or
- * attempt to set waitStatus fails, wake up to resync (in which
- * case the waitStatus can be transiently and harmlessly wrong).
- */
- Node p = enq(node);
- int ws = p.waitStatus;
- if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
-
- /**
- * Transfers node, if necessary, to sync queue after a cancelled wait.
- * Returns true if thread was cancelled before being signalled.
- *
- * @param node the node
- * @return true if cancelled before the node was signalled
- */
- final boolean transferAfterCancelledWait(Node node) {
- if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
- enq(node);
- return true;
- }
- /*
- * If we lost out to a signal(), then we can't proceed
- * until it finishes its enq(). Cancelling during an
- * incomplete transfer is both rare and transient, so just
- * spin.
- */
- while (!isOnSyncQueue(node))
- Thread.yield();
- return false;
- }
-
- /**
- * Invokes release with current state value; returns saved state.
- * Cancels node and throws exception on failure.
- * @param node the condition node for this wait
- * @return previous sync state
- */
- final int fullyRelease(Node node) {
- try {
- int savedState = getState();
- if (release(savedState))
- return savedState;
- throw new IllegalMonitorStateException();
- } catch (Throwable t) {
- node.waitStatus = Node.CANCELLED;
- throw t;
- }
- }
-
// Instrumentation methods for conditions
/**
@@ -1868,106 +1430,34 @@
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
- private transient Node firstWaiter;
+ private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
- private transient Node lastWaiter;
+ private transient ConditionNode lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
- // Internal methods
-
- /**
- * Adds a new waiter to wait queue.
- * @return its new wait node
- */
- private Node addConditionWaiter() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node t = lastWaiter;
- // If lastWaiter is cancelled, clean out.
- if (t != null && t.waitStatus != Node.CONDITION) {
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
-
- Node node = new Node(Node.CONDITION);
-
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
-
- /**
- * Removes and transfers nodes until hit non-cancelled one or
- * null. Split out from signal in part to encourage compilers
- * to inline the case of no waiters.
- * @param first (non-null) the first node on condition queue
- */
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
+ // Signalling methods
/**
- * Removes and transfers all nodes.
- * @param first (non-null) the first node on condition queue
+ * Removes and transfers one or all waiters to sync queue.
*/
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
+ private void doSignal(ConditionNode first, boolean all) {
+ while (first != null) {
+ ConditionNode next = first.nextWaiter;
+ if ((firstWaiter = next) == null)
+ lastWaiter = null;
+ if ((first.getAndUnsetStatus(COND) & COND) != 0) {
+ enqueue(first);
+ if (!all)
+ break;
+ }
first = next;
- } while (first != null);
- }
-
- /**
- * Unlinks cancelled waiter nodes from condition queue.
- * Called only while holding lock. This is called when
- * cancellation occurred during condition wait, and upon
- * insertion of a new waiter when lastWaiter is seen to have
- * been cancelled. This method is needed to avoid garbage
- * retention in the absence of signals. So even though it may
- * require a full traversal, it comes into play only when
- * timeouts or cancellations occur in the absence of
- * signals. It traverses all nodes rather than stopping at a
- * particular target to unlink all pointers to garbage nodes
- * without requiring many re-traversals during cancellation
- * storms.
- */
- private void unlinkCancelledWaiters() {
- Node t = firstWaiter;
- Node trail = null;
- while (t != null) {
- Node next = t.nextWaiter;
- if (t.waitStatus != Node.CONDITION) {
- t.nextWaiter = null;
- if (trail == null)
- firstWaiter = next;
- else
- trail.nextWaiter = next;
- if (next == null)
- lastWaiter = trail;
- }
- else
- trail = t;
- t = next;
}
}
- // public methods
-
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
@@ -1977,11 +1467,11 @@
* returns {@code false}
*/
public final void signal() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
- doSignal(first);
+ doSignal(first, false);
}
/**
@@ -1992,11 +1482,72 @@
* returns {@code false}
*/
public final void signalAll() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
- doSignalAll(first);
+ doSignal(first, true);
+ }
+
+ // Waiting methods
+
+ /**
+ * Adds node to condition list and releases lock.
+ *
+ * @param node the node
+ * @return savedState to reacquire after wait
+ */
+ private int enableWait(ConditionNode node) {
+ if (isHeldExclusively()) {
+ node.waiter = Thread.currentThread();
+ node.setStatusRelaxed(COND | WAITING);
+ ConditionNode last = lastWaiter;
+ if (last == null)
+ firstWaiter = node;
+ else
+ last.nextWaiter = node;
+ lastWaiter = node;
+ int savedState = getState();
+ if (release(savedState))
+ return savedState;
+ }
+ node.status = CANCELLED; // lock not held or inconsistent
+ throw new IllegalMonitorStateException();
+ }
+
+ /**
+ * Returns true if a node that was initially placed on a condition
+ * queue is now ready to reacquire on sync queue.
+ * @param node the node
+ * @return true if is reacquiring
+ */
+ private boolean canReacquire(ConditionNode node) {
+ // check links, not status to avoid enqueue race
+ return node != null && node.prev != null && isEnqueued(node);
+ }
+
+ /**
+ * Unlinks the given node and other non-waiting nodes from
+ * condition queue unless already unlinked.
+ */
+ private void unlinkCancelledWaiters(ConditionNode node) {
+ if (node == null || node.nextWaiter != null || node == lastWaiter) {
+ ConditionNode w = firstWaiter, trail = null;
+ while (w != null) {
+ ConditionNode next = w.nextWaiter;
+ if ((w.status & COND) == 0) {
+ w.nextWaiter = null;
+ if (trail == null)
+ firstWaiter = next;
+ else
+ trail.nextWaiter = next;
+ if (next == null)
+ lastWaiter = trail;
+ } else
+ trail = w;
+ w = next;
+ }
+ }
}
/**
@@ -2011,51 +1562,27 @@
* </ol>
*/
public final void awaitUninterruptibly() {
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
+ ConditionNode node = new ConditionNode();
+ int savedState = enableWait(node);
+ LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
+ while (!canReacquire(node)) {
if (Thread.interrupted())
interrupted = true;
+ else if ((node.status & COND) != 0) {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ } else
+ Thread.onSpinWait(); // awoke while enqueuing
}
- if (acquireQueued(node, savedState) || interrupted)
- selfInterrupt();
- }
-
- /*
- * For interruptible waits, we need to track whether to throw
- * InterruptedException, if interrupted while blocked on
- * condition, versus reinterrupt current thread, if
- * interrupted while blocked waiting to re-acquire.
- */
-
- /** Mode meaning to reinterrupt on exit from wait */
- private static final int REINTERRUPT = 1;
- /** Mode meaning to throw InterruptedException on exit from wait */
- private static final int THROW_IE = -1;
-
- /**
- * Checks for interrupt, returning THROW_IE if interrupted
- * before signalled, REINTERRUPT if after signalled, or
- * 0 if not interrupted.
- */
- private int checkInterruptWhileWaiting(Node node) {
- return Thread.interrupted() ?
- (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
- 0;
- }
-
- /**
- * Throws InterruptedException, reinterrupts current thread, or
- * does nothing, depending on mode.
- */
- private void reportInterruptAfterWait(int interruptMode)
- throws InterruptedException {
- if (interruptMode == THROW_IE)
- throw new InterruptedException();
- else if (interruptMode == REINTERRUPT)
- selfInterrupt();
+ LockSupport.setCurrentBlocker(null);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
/**
@@ -2074,20 +1601,33 @@
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
+ ConditionNode node = new ConditionNode();
+ int savedState = enableWait(node);
+ LockSupport.setCurrentBlocker(this); // for back-compatibility
+ boolean interrupted = false, cancelled = false;
+ while (!canReacquire(node)) {
+ if (interrupted |= Thread.interrupted()) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
+ break; // else interrupted after signal
+ } else if ((node.status & COND) != 0) {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ } else
+ Thread.onSpinWait(); // awoke while enqueuing
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
+ LockSupport.setCurrentBlocker(null);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (interrupted) {
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ throw new InterruptedException();
+ }
+ Thread.currentThread().interrupt();
+ }
}
/**
@@ -2107,32 +1647,29 @@
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- // We don't check for nanosTimeout <= 0L here, to allow
- // awaitNanos(0) as a way to "yield the lock".
- final long deadline = System.nanoTime() + nanosTimeout;
- long initialNanos = nanosTimeout;
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (nanosTimeout <= 0L) {
- transferAfterCancelledWait(node);
- break;
- }
- if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- nanosTimeout = deadline - System.nanoTime();
+ ConditionNode node = new ConditionNode();
+ int savedState = enableWait(node);
+ long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
+ long deadline = System.nanoTime() + nanos;
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ (nanos = deadline - System.nanoTime()) <= 0L) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
+ break;
+ } else
+ LockSupport.parkNanos(this, nanos);
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
long remaining = deadline - System.nanoTime(); // avoid overflow
- return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
+ return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}
/**
@@ -2154,26 +1691,26 @@
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- boolean timedout = false;
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (System.currentTimeMillis() >= abstime) {
- timedout = transferAfterCancelledWait(node);
- break;
- }
- LockSupport.parkUntil(this, abstime);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
+ ConditionNode node = new ConditionNode();
+ int savedState = enableWait(node);
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ System.currentTimeMillis() >= abstime) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
+ break;
+ } else
+ LockSupport.parkUntil(this, abstime);
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- return !timedout;
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
+ return !cancelled;
}
/**
@@ -2195,31 +1732,28 @@
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
- // We don't check for nanosTimeout <= 0L here, to allow
- // await(0, unit) as a way to "yield the lock".
- final long deadline = System.nanoTime() + nanosTimeout;
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- boolean timedout = false;
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (nanosTimeout <= 0L) {
- timedout = transferAfterCancelledWait(node);
- break;
- }
- if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- nanosTimeout = deadline - System.nanoTime();
+ ConditionNode node = new ConditionNode();
+ int savedState = enableWait(node);
+ long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
+ long deadline = System.nanoTime() + nanos;
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ (nanos = deadline - System.nanoTime()) <= 0L) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
+ break;
+ } else
+ LockSupport.parkNanos(this, nanos);
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- return !timedout;
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
+ return !cancelled;
}
// support for instrumentation
@@ -2245,8 +1779,8 @@
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION)
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0)
return true;
}
return false;
@@ -2265,8 +1799,8 @@
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION)
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0)
++n;
}
return n;
@@ -2285,9 +1819,9 @@
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION) {
- Thread t = w.thread;
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0) {
+ Thread t = w.waiter;
if (t != null)
list.add(t);
}
@@ -2296,39 +1830,16 @@
}
}
- // VarHandle mechanics
- private static final VarHandle STATE;
- private static final VarHandle HEAD;
- private static final VarHandle TAIL;
+ // Unsafe
+ private static final Unsafe U = Unsafe.getUnsafe();
+ private static final long STATE
+ = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
+ private static final long HEAD
+ = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
+ private static final long TAIL
+ = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");
static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
- HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
- TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
-
- // Reduce the risk of rare disastrous classloading in first call to
- // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
-
- /**
- * Initializes head and tail fields on first contention.
- */
- private final void initializeSyncQueue() {
- Node h;
- if (HEAD.compareAndSet(this, null, (h = new Node())))
- tail = h;
- }
-
- /**
- * CASes tail field.
- */
- private final boolean compareAndSetTail(Node expect, Node update) {
- return TAIL.compareAndSet(this, expect, update);
- }
}