--- a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java Wed Mar 25 12:24:30 2009 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java Thu Mar 26 11:59:07 2009 -0700
@@ -166,6 +166,11 @@
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
+ /**
+ * waitStatus value to indicate the next acquireShared should
+ * unconditionally propagate
+ */
+ static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
@@ -180,10 +185,16 @@
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
- * It will not be used as a sync queue node until
- * transferred. (Use of this value here
- * has nothing to do with the other uses
- * of the field, but simplifies mechanics.)
+ * It will not be used as a sync queue node
+ * until transferred, at which time the status
+ * will be set to 0. (Use of this value here has
+ * nothing to do with the other uses of the
+ * field, but simplifies mechanics.)
+ * PROPAGATE: A releaseShared should be propagated to other
+ * nodes. This is set (for head node only) in
+ * doReleaseShared to ensure propagation
+ * continues, even if other operations have
+ * since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
@@ -403,10 +414,13 @@
*/
private void unparkSuccessor(Node node) {
/*
- * Try to clear status in anticipation of signalling. It is
- * OK if this fails or if status is changed by waiting thread.
+ * If status is negative (i.e., possibly needing signal) try
+ * to clear in anticipation of signalling. It is OK if this
+ * fails or if status is changed by waiting thread.
*/
- compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+ int ws = node.waitStatus;
+ if (ws < 0)
+ compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
@@ -426,23 +440,70 @@
}
/**
+ * Release action for shared mode -- signal successor and ensure
+ * propagation. (Note: For exclusive mode, release just amounts
+ * to calling unparkSuccessor of head if it needs signal.)
+ */
+ private void doReleaseShared() {
+ /*
+ * Ensure that a release propagates, even if there are other
+ * in-progress acquires/releases. This proceeds in the usual
+ * way of trying to unparkSuccessor of head if it needs
+ * signal. But if it does not, status is set to PROPAGATE to
+ * ensure that upon release, propagation continues.
+ * Additionally, we must loop in case a new node is added
+ * while we are doing this. Also, unlike other uses of
+ * unparkSuccessor, we need to know if CAS to reset status
+ * fails, if so rechecking.
+ */
+ for (;;) {
+ Node h = head;
+ if (h != null && h != tail) {
+ int ws = h.waitStatus;
+ if (ws == Node.SIGNAL) {
+ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+ continue; // loop to recheck cases
+ unparkSuccessor(h);
+ }
+ else if (ws == 0 &&
+ !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+ continue; // loop on failed CAS
+ }
+ if (h == head) // loop if head changed
+ break;
+ }
+ }
+
+ /**
* Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if propagate > 0.
+ * in shared mode, if so propagating if either propagate > 0 or
+ * PROPAGATE status was set.
*
- * @param pred the node holding waitStatus for node
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, long propagate) {
+ Node h = head; // Record old head for check below
setHead(node);
- if (propagate > 0 && node.waitStatus != 0) {
- /*
- * Don't bother fully figuring out successor. If it
- * looks null, call unparkSuccessor anyway to be safe.
- */
+ /*
+ * Try to signal next queued node if:
+ * Propagation was indicated by caller,
+ * or was recorded (as h.waitStatus) by a previous operation
+ * (note: this uses sign-check of waitStatus because
+ * PROPAGATE status may transition to SIGNAL.)
+ * and
+ * The next node is waiting in shared mode,
+ * or we don't know, because it appears null
+ *
+ * The conservatism in both of these checks may cause
+ * unnecessary wake-ups, but only when there are multiple
+ * racing acquires/releases, so most need signals now or soon
+ * anyway.
+ */
+ if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
- unparkSuccessor(node);
+ doReleaseShared();
}
}
@@ -465,23 +526,27 @@
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
- // Getting this before setting waitStatus ensures staleness
+ // predNext is the apparent node to unsplice. CASes below will
+ // fail if not, in which case, we lost race vs another cancel
+ // or signal, so no further action is necessary.
Node predNext = pred.next;
- // Can use unconditional write instead of CAS here
+ // Can use unconditional write instead of CAS here.
+ // After this atomic step, other Nodes can skip past us.
+ // Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
- // If we are the tail, remove ourselves
+ // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
- // If "active" predecessor found...
- if (pred != head
- && (pred.waitStatus == Node.SIGNAL
- || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
- && pred.thread != null) {
-
- // If successor is active, set predecessor's next link
+ // If successor needs signal, try to set pred's next-link
+ // so it will get one. Otherwise wake it up to propagate.
+ int ws;
+ if (pred != head &&
+ ((ws = pred.waitStatus) == Node.SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+ pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
@@ -503,14 +568,14 @@
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int s = pred.waitStatus;
- if (s < 0)
+ int ws = pred.waitStatus;
+ if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
- if (s > 0) {
+ if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
@@ -519,14 +584,14 @@
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
- }
- else
+ } else {
/*
- * Indicate that we need a signal, but don't park yet. Caller
- * will need to retry to make sure it cannot acquire before
- * parking.
+ * waitStatus must be 0 or PROPAGATE. Indicate that we
+ * need a signal, but don't park yet. Caller will need to
+ * retry to make sure it cannot acquire before parking.
*/
- compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
+ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
+ }
return false;
}
@@ -1046,9 +1111,7 @@
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ doReleaseShared();
return true;
}
return false;
@@ -1390,8 +1453,8 @@
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
- int c = p.waitStatus;
- if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+ int ws = p.waitStatus;
+ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}