164 static final int CANCELLED = 1; |
164 static final int CANCELLED = 1; |
165 /** waitStatus value to indicate successor's thread needs unparking */ |
165 /** waitStatus value to indicate successor's thread needs unparking */ |
166 static final int SIGNAL = -1; |
166 static final int SIGNAL = -1; |
167 /** waitStatus value to indicate thread is waiting on condition */ |
167 /** waitStatus value to indicate thread is waiting on condition */ |
168 static final int CONDITION = -2; |
168 static final int CONDITION = -2; |
|
169 /** |
|
170 * waitStatus value to indicate the next acquireShared should |
|
171 * unconditionally propagate |
|
172 */ |
|
173 static final int PROPAGATE = -3; |
169 |
174 |
170 /** |
175 /** |
171 * Status field, taking on only the values: |
176 * Status field, taking on only the values: |
172 * SIGNAL: The successor of this node is (or will soon be) |
177 * SIGNAL: The successor of this node is (or will soon be) |
173 * blocked (via park), so the current node must |
178 * blocked (via park), so the current node must |
178 * on failure, block. |
183 * on failure, block. |
179 * CANCELLED: This node is cancelled due to timeout or interrupt. |
184 * CANCELLED: This node is cancelled due to timeout or interrupt. |
180 * Nodes never leave this state. In particular, |
185 * Nodes never leave this state. In particular, |
181 * a thread with cancelled node never again blocks. |
186 * a thread with cancelled node never again blocks. |
182 * CONDITION: This node is currently on a condition queue. |
187 * CONDITION: This node is currently on a condition queue. |
183 * It will not be used as a sync queue node until |
188 * It will not be used as a sync queue node |
184 * transferred. (Use of this value here |
189 * until transferred, at which time the status |
185 * has nothing to do with the other uses |
190 * will be set to 0. (Use of this value here has |
186 * of the field, but simplifies mechanics.) |
191 * nothing to do with the other uses of the |
|
192 * field, but simplifies mechanics.) |
|
193 * PROPAGATE: A releaseShared should be propagated to other |
|
194 * nodes. This is set (for head node only) in |
|
195 * doReleaseShared to ensure propagation |
|
196 * continues, even if other operations have |
|
197 * since intervened. |
187 * 0: None of the above |
198 * 0: None of the above |
188 * |
199 * |
189 * The values are arranged numerically to simplify use. |
200 * The values are arranged numerically to simplify use. |
190 * Non-negative values mean that a node doesn't need to |
201 * Non-negative values mean that a node doesn't need to |
191 * signal. So, most code doesn't need to check for particular |
202 * signal. So, most code doesn't need to check for particular |
401 * |
412 * |
402 * @param node the node |
413 * @param node the node |
403 */ |
414 */ |
404 private void unparkSuccessor(Node node) { |
415 private void unparkSuccessor(Node node) { |
405 /* |
416 /* |
406 * Try to clear status in anticipation of signalling. It is |
417 * If status is negative (i.e., possibly needing signal) try |
407 * OK if this fails or if status is changed by waiting thread. |
418 * to clear in anticipation of signalling. It is OK if this |
408 */ |
419 * fails or if status is changed by waiting thread. |
409 compareAndSetWaitStatus(node, Node.SIGNAL, 0); |
420 */ |
|
421 int ws = node.waitStatus; |
|
422 if (ws < 0) |
|
423 compareAndSetWaitStatus(node, ws, 0); |
410 |
424 |
411 /* |
425 /* |
412 * Thread to unpark is held in successor, which is normally |
426 * Thread to unpark is held in successor, which is normally |
413 * just the next node. But if cancelled or apparently null, |
427 * just the next node. But if cancelled or apparently null, |
414 * traverse backwards from tail to find the actual |
428 * traverse backwards from tail to find the actual |
424 if (s != null) |
438 if (s != null) |
425 LockSupport.unpark(s.thread); |
439 LockSupport.unpark(s.thread); |
426 } |
440 } |
427 |
441 |
428 /** |
442 /** |
|
443 * Release action for shared mode -- signal successor and ensure |
|
444 * propagation. (Note: For exclusive mode, release just amounts |
|
445 * to calling unparkSuccessor of head if it needs signal.) |
|
446 */ |
|
447 private void doReleaseShared() { |
|
448 /* |
|
449 * Ensure that a release propagates, even if there are other |
|
450 * in-progress acquires/releases. This proceeds in the usual |
|
451 * way of trying to unparkSuccessor of head if it needs |
|
452 * signal. But if it does not, status is set to PROPAGATE to |
|
453 * ensure that upon release, propagation continues. |
|
454 * Additionally, we must loop in case a new node is added |
|
455 * while we are doing this. Also, unlike other uses of |
|
456 * unparkSuccessor, we need to know if CAS to reset status |
|
457 * fails, if so rechecking. |
|
458 */ |
|
459 for (;;) { |
|
460 Node h = head; |
|
461 if (h != null && h != tail) { |
|
462 int ws = h.waitStatus; |
|
463 if (ws == Node.SIGNAL) { |
|
464 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) |
|
465 continue; // loop to recheck cases |
|
466 unparkSuccessor(h); |
|
467 } |
|
468 else if (ws == 0 && |
|
469 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) |
|
470 continue; // loop on failed CAS |
|
471 } |
|
472 if (h == head) // loop if head changed |
|
473 break; |
|
474 } |
|
475 } |
|
476 |
|
477 /** |
429 * Sets head of queue, and checks if successor may be waiting |
478 * Sets head of queue, and checks if successor may be waiting |
430 * in shared mode, if so propagating if propagate > 0. |
479 * in shared mode, if so propagating if either propagate > 0 or |
431 * |
480 * PROPAGATE status was set. |
432 * @param pred the node holding waitStatus for node |
481 * |
433 * @param node the node |
482 * @param node the node |
434 * @param propagate the return value from a tryAcquireShared |
483 * @param propagate the return value from a tryAcquireShared |
435 */ |
484 */ |
436 private void setHeadAndPropagate(Node node, long propagate) { |
485 private void setHeadAndPropagate(Node node, long propagate) { |
|
486 Node h = head; // Record old head for check below |
437 setHead(node); |
487 setHead(node); |
438 if (propagate > 0 && node.waitStatus != 0) { |
488 /* |
439 /* |
489 * Try to signal next queued node if: |
440 * Don't bother fully figuring out successor. If it |
490 * Propagation was indicated by caller, |
441 * looks null, call unparkSuccessor anyway to be safe. |
491 * or was recorded (as h.waitStatus) by a previous operation |
442 */ |
492 * (note: this uses sign-check of waitStatus because |
|
493 * PROPAGATE status may transition to SIGNAL.) |
|
494 * and |
|
495 * The next node is waiting in shared mode, |
|
496 * or we don't know, because it appears null |
|
497 * |
|
498 * The conservatism in both of these checks may cause |
|
499 * unnecessary wake-ups, but only when there are multiple |
|
500 * racing acquires/releases, so most need signals now or soon |
|
501 * anyway. |
|
502 */ |
|
503 if (propagate > 0 || h == null || h.waitStatus < 0) { |
443 Node s = node.next; |
504 Node s = node.next; |
444 if (s == null || s.isShared()) |
505 if (s == null || s.isShared()) |
445 unparkSuccessor(node); |
506 doReleaseShared(); |
446 } |
507 } |
447 } |
508 } |
448 |
509 |
449 // Utilities for various versions of acquire |
510 // Utilities for various versions of acquire |
450 |
511 |
463 // Skip cancelled predecessors |
524 // Skip cancelled predecessors |
464 Node pred = node.prev; |
525 Node pred = node.prev; |
465 while (pred.waitStatus > 0) |
526 while (pred.waitStatus > 0) |
466 node.prev = pred = pred.prev; |
527 node.prev = pred = pred.prev; |
467 |
528 |
468 // Getting this before setting waitStatus ensures staleness |
529 // predNext is the apparent node to unsplice. CASes below will |
|
530 // fail if not, in which case, we lost race vs another cancel |
|
531 // or signal, so no further action is necessary. |
469 Node predNext = pred.next; |
532 Node predNext = pred.next; |
470 |
533 |
471 // Can use unconditional write instead of CAS here |
534 // Can use unconditional write instead of CAS here. |
|
535 // After this atomic step, other Nodes can skip past us. |
|
536 // Before, we are free of interference from other threads. |
472 node.waitStatus = Node.CANCELLED; |
537 node.waitStatus = Node.CANCELLED; |
473 |
538 |
474 // If we are the tail, remove ourselves |
539 // If we are the tail, remove ourselves. |
475 if (node == tail && compareAndSetTail(node, pred)) { |
540 if (node == tail && compareAndSetTail(node, pred)) { |
476 compareAndSetNext(pred, predNext, null); |
541 compareAndSetNext(pred, predNext, null); |
477 } else { |
542 } else { |
478 // If "active" predecessor found... |
543 // If successor needs signal, try to set pred's next-link |
479 if (pred != head |
544 // so it will get one. Otherwise wake it up to propagate. |
480 && (pred.waitStatus == Node.SIGNAL |
545 int ws; |
481 || compareAndSetWaitStatus(pred, 0, Node.SIGNAL)) |
546 if (pred != head && |
482 && pred.thread != null) { |
547 ((ws = pred.waitStatus) == Node.SIGNAL || |
483 |
548 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && |
484 // If successor is active, set predecessor's next link |
549 pred.thread != null) { |
485 Node next = node.next; |
550 Node next = node.next; |
486 if (next != null && next.waitStatus <= 0) |
551 if (next != null && next.waitStatus <= 0) |
487 compareAndSetNext(pred, predNext, next); |
552 compareAndSetNext(pred, predNext, next); |
488 } else { |
553 } else { |
489 unparkSuccessor(node); |
554 unparkSuccessor(node); |
501 * @param pred node's predecessor holding status |
566 * @param pred node's predecessor holding status |
502 * @param node the node |
567 * @param node the node |
503 * @return {@code true} if thread should block |
568 * @return {@code true} if thread should block |
504 */ |
569 */ |
505 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
570 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
506 int s = pred.waitStatus; |
571 int ws = pred.waitStatus; |
507 if (s < 0) |
572 if (ws == Node.SIGNAL) |
508 /* |
573 /* |
509 * This node has already set status asking a release |
574 * This node has already set status asking a release |
510 * to signal it, so it can safely park. |
575 * to signal it, so it can safely park. |
511 */ |
576 */ |
512 return true; |
577 return true; |
513 if (s > 0) { |
578 if (ws > 0) { |
514 /* |
579 /* |
515 * Predecessor was cancelled. Skip over predecessors and |
580 * Predecessor was cancelled. Skip over predecessors and |
516 * indicate retry. |
581 * indicate retry. |
517 */ |
582 */ |
518 do { |
583 do { |
519 node.prev = pred = pred.prev; |
584 node.prev = pred = pred.prev; |
520 } while (pred.waitStatus > 0); |
585 } while (pred.waitStatus > 0); |
521 pred.next = node; |
586 pred.next = node; |
522 } |
587 } else { |
523 else |
|
524 /* |
588 /* |
525 * Indicate that we need a signal, but don't park yet. Caller |
589 * waitStatus must be 0 or PROPAGATE. Indicate that we |
526 * will need to retry to make sure it cannot acquire before |
590 * need a signal, but don't park yet. Caller will need to |
527 * parking. |
591 * retry to make sure it cannot acquire before parking. |
528 */ |
592 */ |
529 compareAndSetWaitStatus(pred, 0, Node.SIGNAL); |
593 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); |
|
594 } |
530 return false; |
595 return false; |
531 } |
596 } |
532 |
597 |
533 /** |
598 /** |
534 * Convenience method to interrupt current thread. |
599 * Convenience method to interrupt current thread. |
1388 * indicate that thread is (probably) waiting. If cancelled or |
1451 * indicate that thread is (probably) waiting. If cancelled or |
1389 * attempt to set waitStatus fails, wake up to resync (in which |
1452 * attempt to set waitStatus fails, wake up to resync (in which |
1390 * case the waitStatus can be transiently and harmlessly wrong). |
1453 * case the waitStatus can be transiently and harmlessly wrong). |
1391 */ |
1454 */ |
1392 Node p = enq(node); |
1455 Node p = enq(node); |
1393 int c = p.waitStatus; |
1456 int ws = p.waitStatus; |
1394 if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) |
1457 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) |
1395 LockSupport.unpark(node.thread); |
1458 LockSupport.unpark(node.thread); |
1396 return true; |
1459 return true; |
1397 } |
1460 } |
1398 |
1461 |
1399 /** |
1462 /** |