branch | datagramsocketimpl-branch |
changeset 58678 | 9cf78a70fa4f |
parent 49565 | b5705ade8c8d |
child 58679 | 9c3209ff7550 |
58677:13588c901957 | 58678:9cf78a70fa4f |
---|---|
33 * http://creativecommons.org/publicdomain/zero/1.0/ |
33 * http://creativecommons.org/publicdomain/zero/1.0/ |
34 */ |
34 */ |
35 |
35 |
36 package java.util.concurrent.locks; |
36 package java.util.concurrent.locks; |
37 |
37 |
38 import java.lang.invoke.MethodHandles; |
|
39 import java.lang.invoke.VarHandle; |
|
40 import java.util.ArrayList; |
38 import java.util.ArrayList; |
41 import java.util.Collection; |
39 import java.util.Collection; |
42 import java.util.Date; |
40 import java.util.Date; |
43 import java.util.concurrent.TimeUnit; |
41 import java.util.concurrent.TimeUnit; |
44 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; |
42 import java.util.concurrent.ForkJoinPool; |
43 import jdk.internal.misc.Unsafe; |
|
45 |
44 |
46 /** |
45 /** |
47 * A version of {@link AbstractQueuedSynchronizer} in |
46 * A version of {@link AbstractQueuedSynchronizer} in |
48 * which synchronization state is maintained as a {@code long}. |
47 * which synchronization state is maintained as a {@code long}. |
49 * This class has exactly the same structure, properties, and methods |
48 * This class has exactly the same structure, properties, and methods |
71 * exactly cloned from AbstractQueuedSynchronizer, replacing class |
70 * exactly cloned from AbstractQueuedSynchronizer, replacing class |
72 * name and changing ints related with sync state to longs. Please |
71 * name and changing ints related with sync state to longs. Please |
73 * keep it that way. |
72 * keep it that way. |
74 */ |
73 */ |
75 |
74 |
76 /** |
75 // Node status bits, also used as argument and return values |
77 * Creates a new {@code AbstractQueuedLongSynchronizer} instance |
76 static final int WAITING = 1; // must be 1 |
78 * with initial synchronization state of zero. |
77 static final int CANCELLED = 0x80000000; // must be negative |
79 */ |
78 static final int COND = 2; // in a condition wait |
80 protected AbstractQueuedLongSynchronizer() { } |
79 |
81 |
80 /** CLH Nodes */ |
82 /** |
81 abstract static class Node { |
83 * Head of the wait queue, lazily initialized. Except for |
82 volatile Node prev; // initially attached via casTail |
84 * initialization, it is modified only via method setHead. Note: |
83 volatile Node next; // visibly nonnull when signallable |
85 * If head exists, its waitStatus is guaranteed not to be |
84 Thread waiter; // visibly nonnull when enqueued |
86 * CANCELLED. |
85 volatile int status; // written by owner, atomic bit ops by others |
86 |
|
87 // methods for atomic operations |
|
88 final boolean casPrev(Node c, Node v) { // for cleanQueue |
|
89 return U.weakCompareAndSetReference(this, PREV, c, v); |
|
90 } |
|
91 final boolean casNext(Node c, Node v) { // for cleanQueue |
|
92 return U.weakCompareAndSetReference(this, NEXT, c, v); |
|
93 } |
|
94 final int getAndUnsetStatus(int v) { // for signalling |
|
95 return U.getAndBitwiseAndInt(this, STATUS, ~v); |
|
96 } |
|
97 final void setPrevRelaxed(Node p) { // for off-queue assignment |
|
98 U.putReference(this, PREV, p); |
|
99 } |
|
100 final void setStatusRelaxed(int s) { // for off-queue assignment |
|
101 U.putInt(this, STATUS, s); |
|
102 } |
|
103 final void clearStatus() { // for reducing unneeded signals |
|
104 U.putIntOpaque(this, STATUS, 0); |
|
105 } |
|
106 |
|
107 private static final long STATUS |
|
108 = U.objectFieldOffset(Node.class, "status"); |
|
109 private static final long NEXT |
|
110 = U.objectFieldOffset(Node.class, "next"); |
|
111 private static final long PREV |
|
112 = U.objectFieldOffset(Node.class, "prev"); |
|
113 } |
|
114 |
|
115 // Concrete classes tagged by type |
|
116 static final class ExclusiveNode extends Node { } |
|
117 static final class SharedNode extends Node { } |
|
118 |
|
119 static final class ConditionNode extends Node |
|
120 implements ForkJoinPool.ManagedBlocker { |
|
121 ConditionNode nextWaiter; // link to next waiting node |
|
122 |
|
123 /** |
|
124 * Allows Conditions to be used in ForkJoinPools without |
|
125 * risking fixed pool exhaustion. This is usable only for |
|
126 * untimed Condition waits, not timed versions. |
|
127 */ |
|
128 public final boolean isReleasable() { |
|
129 return status <= 1 || Thread.currentThread().isInterrupted(); |
|
130 } |
|
131 |
|
132 public final boolean block() { |
|
133 while (!isReleasable()) LockSupport.park(); |
|
134 return true; |
|
135 } |
|
136 } |
|
137 |
|
138 /** |
|
139 * Head of the wait queue, lazily initialized. |
|
87 */ |
140 */ |
88 private transient volatile Node head; |
141 private transient volatile Node head; |
89 |
142 |
90 /** |
143 /** |
91 * Tail of the wait queue, lazily initialized. Modified only via |
144 * Tail of the wait queue. After initialization, modified only via casTail. |
92 * method enq to add new wait node. |
|
93 */ |
145 */ |
94 private transient volatile Node tail; |
146 private transient volatile Node tail; |
95 |
147 |
96 /** |
148 /** |
97 * The synchronization state. |
149 * The synchronization state. |
111 * Sets the value of synchronization state. |
163 * Sets the value of synchronization state. |
112 * This operation has memory semantics of a {@code volatile} write. |
164 * This operation has memory semantics of a {@code volatile} write. |
113 * @param newState the new state value |
165 * @param newState the new state value |
114 */ |
166 */ |
115 protected final void setState(long newState) { |
167 protected final void setState(long newState) { |
116 // See JDK-8180620: Clarify VarHandle mixed-access subtleties |
168 state = newState; |
117 STATE.setVolatile(this, newState); |
|
118 } |
169 } |
119 |
170 |
120 /** |
171 /** |
121 * Atomically sets synchronization state to the given updated |
172 * Atomically sets synchronization state to the given updated |
122 * value if the current state value equals the expected value. |
173 * value if the current state value equals the expected value. |
127 * @param update the new value |
178 * @param update the new value |
128 * @return {@code true} if successful. False return indicates that the actual |
179 * @return {@code true} if successful. False return indicates that the actual |
129 * value was not equal to the expected value. |
180 * value was not equal to the expected value. |
130 */ |
181 */ |
131 protected final boolean compareAndSetState(long expect, long update) { |
182 protected final boolean compareAndSetState(long expect, long update) { |
132 return STATE.compareAndSet(this, expect, update); |
183 return U.compareAndSetLong(this, STATE, expect, update); |
133 } |
184 } |
134 |
185 |
135 // Queuing utilities |
186 // Queuing utilities |
136 |
187 |
137 /** |
188 private boolean casTail(Node c, Node v) { |
138 * The number of nanoseconds for which it is faster to spin |
189 return U.compareAndSetReference(this, TAIL, c, v); |
139 * rather than to use timed park. A rough estimate suffices |
190 } |
140 * to improve responsiveness with very short timeouts. |
191 |
141 */ |
192 /** tries once to CAS a new dummy node for head */ |
142 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; |
193 private void tryInitializeHead() { |
143 |
194 Node h = new ExclusiveNode(); |
144 /** |
195 if (U.compareAndSetReference(this, HEAD, null, h)) |
145 * Inserts node into queue, initializing if necessary. See picture above. |
196 tail = h; |
146 * @param node the node to insert |
197 } |
147 * @return node's predecessor |
198 |
148 */ |
199 /** |
149 private Node enq(Node node) { |
200 * Enqueues the node unless null. (Currently used only for |
201 * ConditionNodes; other cases are interleaved with acquires.) |
|
202 */ |
|
203 final void enqueue(Node node) { |
|
204 if (node != null) { |
|
205 for (;;) { |
|
206 Node t = tail; |
|
207 node.setPrevRelaxed(t); // avoid unnecessary fence |
|
208 if (t == null) // initialize |
|
209 tryInitializeHead(); |
|
210 else if (casTail(t, node)) { |
|
211 t.next = node; |
|
212 if (t.status < 0) // wake up to clean link |
|
213 LockSupport.unpark(node.waiter); |
|
214 break; |
|
215 } |
|
216 } |
|
217 } |
|
218 } |
|
219 |
|
220 /** Returns true if node is found in traversal from tail */ |
|
221 final boolean isEnqueued(Node node) { |
|
222 for (Node t = tail; t != null; t = t.prev) |
|
223 if (t == node) |
|
224 return true; |
|
225 return false; |
|
226 } |
|
227 |
|
228 /** |
|
229 * Wakes up the successor of given node, if one exists, and unsets its |
|
230 * WAITING status to avoid park race. This may fail to wake up an |
|
231 * eligible thread when one or more have been cancelled, but |
|
232 * cancelAcquire ensures liveness. |
|
233 */ |
|
234 private static void signalNext(Node h) { |
|
235 Node s; |
|
236 if (h != null && (s = h.next) != null && s.status != 0) { |
|
237 s.getAndUnsetStatus(WAITING); |
|
238 LockSupport.unpark(s.waiter); |
|
239 } |
|
240 } |
|
241 |
|
242 /** Wakes up the given node if in shared mode */ |
|
243 private static void signalNextIfShared(Node h) { |
|
244 Node s; |
|
245 if (h != null && (s = h.next) != null && |
|
246 (s instanceof SharedNode) && s.status != 0) { |
|
247 s.getAndUnsetStatus(WAITING); |
|
248 LockSupport.unpark(s.waiter); |
|
249 } |
|
250 } |
|
251 |
|
252 /** |
|
253 * Main acquire method, invoked by all exported acquire methods. |
|
254 * |
|
255 * @param node null unless a reacquiring Condition |
|
256 * @param arg the acquire argument |
|
257 * @param shared true if shared mode else exclusive |
|
258 * @param interruptible if abort and return negative on interrupt |
|
259 * @param timed if true use timed waits |
|
260 * @param time if timed, the System.nanoTime value to timeout |
|
261 * @return positive if acquired, 0 if timed out, negative if interrupted |
|
262 */ |
|
263 final int acquire(Node node, long arg, boolean shared, |
|
264 boolean interruptible, boolean timed, long time) { |
|
265 Thread current = Thread.currentThread(); |
|
266 byte spins = 0, postSpins = 0; // retries upon unpark of first thread |
|
267 boolean interrupted = false, first = false; |
|
268 Node pred = null; // predecessor of node when enqueued |
|
269 |
|
270 /* |
|
271 * Repeatedly: |
|
272 * Check if node now first |
|
273 * if so, ensure head stable, else ensure valid predecessor |
|
274 * if node is first or not yet enqueued, try acquiring |
|
275 * else if node not yet created, create it |
|
276 * else if not yet enqueued, try once to enqueue |
|
277 * else if woken from park, retry (up to postSpins times) |
|
278 * else if WAITING status not set, set and retry |
|
279 * else park and clear WAITING status, and check cancellation |
|
280 */ |
|
281 |
|
150 for (;;) { |
282 for (;;) { |
151 Node oldTail = tail; |
283 if (!first && (pred = (node == null) ? null : node.prev) != null && |
152 if (oldTail != null) { |
284 !(first = (head == pred))) { |
153 node.setPrevRelaxed(oldTail); |
285 if (pred.status < 0) { |
154 if (compareAndSetTail(oldTail, node)) { |
286 cleanQueue(); // predecessor cancelled |
155 oldTail.next = node; |
287 continue; |
156 return oldTail; |
288 } else if (pred.prev == null) { |
289 Thread.onSpinWait(); // ensure serialization |
|
290 continue; |
|
157 } |
291 } |
292 } |
|
293 if (first || pred == null) { |
|
294 boolean acquired; |
|
295 try { |
|
296 if (shared) |
|
297 acquired = (tryAcquireShared(arg) >= 0); |
|
298 else |
|
299 acquired = tryAcquire(arg); |
|
300 } catch (Throwable ex) { |
|
301 cancelAcquire(node, interrupted, false); |
|
302 throw ex; |
|
303 } |
|
304 if (acquired) { |
|
305 if (first) { |
|
306 node.prev = null; |
|
307 head = node; |
|
308 pred.next = null; |
|
309 node.waiter = null; |
|
310 if (shared) |
|
311 signalNextIfShared(node); |
|
312 if (interrupted) |
|
313 current.interrupt(); |
|
314 } |
|
315 return 1; |
|
316 } |
|
317 } |
|
318 if (node == null) { // allocate; retry before enqueue |
|
319 if (shared) |
|
320 node = new SharedNode(); |
|
321 else |
|
322 node = new ExclusiveNode(); |
|
323 } else if (pred == null) { // try to enqueue |
|
324 node.waiter = current; |
|
325 Node t = tail; |
|
326 node.setPrevRelaxed(t); // avoid unnecessary fence |
|
327 if (t == null) |
|
328 tryInitializeHead(); |
|
329 else if (!casTail(t, node)) |
|
330 node.setPrevRelaxed(null); // back out |
|
331 else |
|
332 t.next = node; |
|
333 } else if (first && spins != 0) { |
|
334 --spins; // reduce unfairness on rewaits |
|
335 Thread.onSpinWait(); |
|
336 } else if (node.status == 0) { |
|
337 node.status = WAITING; // enable signal and recheck |
|
158 } else { |
338 } else { |
159 initializeSyncQueue(); |
339 long nanos; |
160 } |
340 spins = postSpins = (byte)((postSpins << 1) | 1); |
161 } |
341 if (!timed) |
162 } |
342 LockSupport.park(this); |
163 |
343 else if ((nanos = time - System.nanoTime()) > 0L) |
164 /** |
344 LockSupport.parkNanos(this, nanos); |
165 * Creates and enqueues node for current thread and given mode. |
345 else |
166 * |
346 break; |
167 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared |
347 node.clearStatus(); |
168 * @return the new node |
348 if ((interrupted |= Thread.interrupted()) && interruptible) |
169 */ |
349 break; |
170 private Node addWaiter(Node mode) { |
350 } |
171 Node node = new Node(mode); |
351 } |
172 |
352 return cancelAcquire(node, interrupted, interruptible); |
173 for (;;) { |
353 } |
174 Node oldTail = tail; |
354 |
175 if (oldTail != null) { |
355 /** |
176 node.setPrevRelaxed(oldTail); |
356 * Possibly repeatedly traverses from tail, unsplicing cancelled |
177 if (compareAndSetTail(oldTail, node)) { |
357 * nodes until none are found. |
178 oldTail.next = node; |
358 */ |
179 return node; |
359 private void cleanQueue() { |
360 for (;;) { // restart point |
|
361 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples |
|
362 if (q == null || (p = q.prev) == null) |
|
363 return; // end of list |
|
364 if (s == null ? tail != q : (s.prev != q || s.status < 0)) |
|
365 break; // inconsistent |
|
366 if (q.status < 0) { // cancelled |
|
367 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && |
|
368 q.prev == p) { |
|
369 p.casNext(q, s); // OK if fails |
|
370 if (p.prev == null) |
|
371 signalNext(p); |
|
372 } |
|
373 break; |
|
180 } |
374 } |
181 } else { |
375 if ((n = p.next) != q) { // help finish |
182 initializeSyncQueue(); |
376 if (n != null && q.prev == p) { |
183 } |
377 p.casNext(n, q); |
184 } |
378 if (p.prev == null) |
185 } |
379 signalNext(p); |
186 |
380 } |
187 /** |
381 break; |
188 * Sets head of queue to be node, thus dequeuing. Called only by |
|
189 * acquire methods. Also nulls out unused fields for sake of GC |
|
190 * and to suppress unnecessary signals and traversals. |
|
191 * |
|
192 * @param node the node |
|
193 */ |
|
194 private void setHead(Node node) { |
|
195 head = node; |
|
196 node.thread = null; |
|
197 node.prev = null; |
|
198 } |
|
199 |
|
200 /** |
|
201 * Wakes up node's successor, if one exists. |
|
202 * |
|
203 * @param node the node |
|
204 */ |
|
205 private void unparkSuccessor(Node node) { |
|
206 /* |
|
207 * If status is negative (i.e., possibly needing signal) try |
|
208 * to clear in anticipation of signalling. It is OK if this |
|
209 * fails or if status is changed by waiting thread. |
|
210 */ |
|
211 int ws = node.waitStatus; |
|
212 if (ws < 0) |
|
213 node.compareAndSetWaitStatus(ws, 0); |
|
214 |
|
215 /* |
|
216 * Thread to unpark is held in successor, which is normally |
|
217 * just the next node. But if cancelled or apparently null, |
|
218 * traverse backwards from tail to find the actual |
|
219 * non-cancelled successor. |
|
220 */ |
|
221 Node s = node.next; |
|
222 if (s == null || s.waitStatus > 0) { |
|
223 s = null; |
|
224 for (Node p = tail; p != node && p != null; p = p.prev) |
|
225 if (p.waitStatus <= 0) |
|
226 s = p; |
|
227 } |
|
228 if (s != null) |
|
229 LockSupport.unpark(s.thread); |
|
230 } |
|
231 |
|
232 /** |
|
233 * Release action for shared mode -- signals successor and ensures |
|
234 * propagation. (Note: For exclusive mode, release just amounts |
|
235 * to calling unparkSuccessor of head if it needs signal.) |
|
236 */ |
|
237 private void doReleaseShared() { |
|
238 /* |
|
239 * Ensure that a release propagates, even if there are other |
|
240 * in-progress acquires/releases. This proceeds in the usual |
|
241 * way of trying to unparkSuccessor of head if it needs |
|
242 * signal. But if it does not, status is set to PROPAGATE to |
|
243 * ensure that upon release, propagation continues. |
|
244 * Additionally, we must loop in case a new node is added |
|
245 * while we are doing this. Also, unlike other uses of |
|
246 * unparkSuccessor, we need to know if CAS to reset status |
|
247 * fails, if so rechecking. |
|
248 */ |
|
249 for (;;) { |
|
250 Node h = head; |
|
251 if (h != null && h != tail) { |
|
252 int ws = h.waitStatus; |
|
253 if (ws == Node.SIGNAL) { |
|
254 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) |
|
255 continue; // loop to recheck cases |
|
256 unparkSuccessor(h); |
|
257 } |
382 } |
258 else if (ws == 0 && |
383 s = q; |
259 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) |
384 q = q.prev; |
260 continue; // loop on failed CAS |
385 } |
261 } |
386 } |
262 if (h == head) // loop if head changed |
387 } |
263 break; |
|
264 } |
|
265 } |
|
266 |
|
267 /** |
|
268 * Sets head of queue, and checks if successor may be waiting |
|
269 * in shared mode, if so propagating if either propagate > 0 or |
|
270 * PROPAGATE status was set. |
|
271 * |
|
272 * @param node the node |
|
273 * @param propagate the return value from a tryAcquireShared |
|
274 */ |
|
275 private void setHeadAndPropagate(Node node, long propagate) { |
|
276 Node h = head; // Record old head for check below |
|
277 setHead(node); |
|
278 /* |
|
279 * Try to signal next queued node if: |
|
280 * Propagation was indicated by caller, |
|
281 * or was recorded (as h.waitStatus either before |
|
282 * or after setHead) by a previous operation |
|
283 * (note: this uses sign-check of waitStatus because |
|
284 * PROPAGATE status may transition to SIGNAL.) |
|
285 * and |
|
286 * The next node is waiting in shared mode, |
|
287 * or we don't know, because it appears null |
|
288 * |
|
289 * The conservatism in both of these checks may cause |
|
290 * unnecessary wake-ups, but only when there are multiple |
|
291 * racing acquires/releases, so most need signals now or soon |
|
292 * anyway. |
|
293 */ |
|
294 if (propagate > 0 || h == null || h.waitStatus < 0 || |
|
295 (h = head) == null || h.waitStatus < 0) { |
|
296 Node s = node.next; |
|
297 if (s == null || s.isShared()) |
|
298 doReleaseShared(); |
|
299 } |
|
300 } |
|
301 |
|
302 // Utilities for various versions of acquire |
|
303 |
388 |
304 /** |
389 /** |
305 * Cancels an ongoing attempt to acquire. |
390 * Cancels an ongoing attempt to acquire. |
306 * |
391 * |
307 * @param node the node |
392 * @param node the node (may be null if cancelled before enqueuing) |
308 */ |
393 * @param interrupted true if thread interrupted |
309 private void cancelAcquire(Node node) { |
394 * @param interruptible if should report interruption vs reset |
310 // Ignore if node doesn't exist |
395 */ |
311 if (node == null) |
396 private int cancelAcquire(Node node, boolean interrupted, |
312 return; |
397 boolean interruptible) { |
313 |
398 if (node != null) { |
314 node.thread = null; |
399 node.waiter = null; |
315 |
400 node.status = CANCELLED; |
316 // Skip cancelled predecessors |
401 if (node.prev != null) |
317 Node pred = node.prev; |
402 cleanQueue(); |
318 while (pred.waitStatus > 0) |
403 } |
319 node.prev = pred = pred.prev; |
404 if (interrupted) { |
320 |
405 if (interruptible) |
321 // predNext is the apparent node to unsplice. CASes below will |
406 return CANCELLED; |
322 // fail if not, in which case, we lost race vs another cancel |
407 else |
323 // or signal, so no further action is necessary, although with |
408 Thread.currentThread().interrupt(); |
324 // a possibility that a cancelled node may transiently remain |
409 } |
325 // reachable. |
410 return 0; |
326 Node predNext = pred.next; |
|
327 |
|
328 // Can use unconditional write instead of CAS here. |
|
329 // After this atomic step, other Nodes can skip past us. |
|
330 // Before, we are free of interference from other threads. |
|
331 node.waitStatus = Node.CANCELLED; |
|
332 |
|
333 // If we are the tail, remove ourselves. |
|
334 if (node == tail && compareAndSetTail(node, pred)) { |
|
335 pred.compareAndSetNext(predNext, null); |
|
336 } else { |
|
337 // If successor needs signal, try to set pred's next-link |
|
338 // so it will get one. Otherwise wake it up to propagate. |
|
339 int ws; |
|
340 if (pred != head && |
|
341 ((ws = pred.waitStatus) == Node.SIGNAL || |
|
342 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && |
|
343 pred.thread != null) { |
|
344 Node next = node.next; |
|
345 if (next != null && next.waitStatus <= 0) |
|
346 pred.compareAndSetNext(predNext, next); |
|
347 } else { |
|
348 unparkSuccessor(node); |
|
349 } |
|
350 |
|
351 node.next = node; // help GC |
|
352 } |
|
353 } |
|
354 |
|
355 /** |
|
356 * Checks and updates status for a node that failed to acquire. |
|
357 * Returns true if thread should block. This is the main signal |
|
358 * control in all acquire loops. Requires that pred == node.prev. |
|
359 * |
|
360 * @param pred node's predecessor holding status |
|
361 * @param node the node |
|
362 * @return {@code true} if thread should block |
|
363 */ |
|
364 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
|
365 int ws = pred.waitStatus; |
|
366 if (ws == Node.SIGNAL) |
|
367 /* |
|
368 * This node has already set status asking a release |
|
369 * to signal it, so it can safely park. |
|
370 */ |
|
371 return true; |
|
372 if (ws > 0) { |
|
373 /* |
|
374 * Predecessor was cancelled. Skip over predecessors and |
|
375 * indicate retry. |
|
376 */ |
|
377 do { |
|
378 node.prev = pred = pred.prev; |
|
379 } while (pred.waitStatus > 0); |
|
380 pred.next = node; |
|
381 } else { |
|
382 /* |
|
383 * waitStatus must be 0 or PROPAGATE. Indicate that we |
|
384 * need a signal, but don't park yet. Caller will need to |
|
385 * retry to make sure it cannot acquire before parking. |
|
386 */ |
|
387 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); |
|
388 } |
|
389 return false; |
|
390 } |
|
391 |
|
392 /** |
|
393 * Convenience method to interrupt current thread. |
|
394 */ |
|
395 static void selfInterrupt() { |
|
396 Thread.currentThread().interrupt(); |
|
397 } |
|
398 |
|
399 /** |
|
400 * Convenience method to park and then check if interrupted. |
|
401 * |
|
402 * @return {@code true} if interrupted |
|
403 */ |
|
404 private final boolean parkAndCheckInterrupt() { |
|
405 LockSupport.park(this); |
|
406 return Thread.interrupted(); |
|
407 } |
|
408 |
|
409 /* |
|
410 * Various flavors of acquire, varying in exclusive/shared and |
|
411 * control modes. Each is mostly the same, but annoyingly |
|
412 * different. Only a little bit of factoring is possible due to |
|
413 * interactions of exception mechanics (including ensuring that we |
|
414 * cancel if tryAcquire throws exception) and other control, at |
|
415 * least not without hurting performance too much. |
|
416 */ |
|
417 |
|
418 /** |
|
419 * Acquires in exclusive uninterruptible mode for thread already in |
|
420 * queue. Used by condition wait methods as well as acquire. |
|
421 * |
|
422 * @param node the node |
|
423 * @param arg the acquire argument |
|
424 * @return {@code true} if interrupted while waiting |
|
425 */ |
|
426 final boolean acquireQueued(final Node node, long arg) { |
|
427 boolean interrupted = false; |
|
428 try { |
|
429 for (;;) { |
|
430 final Node p = node.predecessor(); |
|
431 if (p == head && tryAcquire(arg)) { |
|
432 setHead(node); |
|
433 p.next = null; // help GC |
|
434 return interrupted; |
|
435 } |
|
436 if (shouldParkAfterFailedAcquire(p, node)) |
|
437 interrupted |= parkAndCheckInterrupt(); |
|
438 } |
|
439 } catch (Throwable t) { |
|
440 cancelAcquire(node); |
|
441 if (interrupted) |
|
442 selfInterrupt(); |
|
443 throw t; |
|
444 } |
|
445 } |
|
446 |
|
447 /** |
|
448 * Acquires in exclusive interruptible mode. |
|
449 * @param arg the acquire argument |
|
450 */ |
|
451 private void doAcquireInterruptibly(long arg) |
|
452 throws InterruptedException { |
|
453 final Node node = addWaiter(Node.EXCLUSIVE); |
|
454 try { |
|
455 for (;;) { |
|
456 final Node p = node.predecessor(); |
|
457 if (p == head && tryAcquire(arg)) { |
|
458 setHead(node); |
|
459 p.next = null; // help GC |
|
460 return; |
|
461 } |
|
462 if (shouldParkAfterFailedAcquire(p, node) && |
|
463 parkAndCheckInterrupt()) |
|
464 throw new InterruptedException(); |
|
465 } |
|
466 } catch (Throwable t) { |
|
467 cancelAcquire(node); |
|
468 throw t; |
|
469 } |
|
470 } |
|
471 |
|
472 /** |
|
473 * Acquires in exclusive timed mode. |
|
474 * |
|
475 * @param arg the acquire argument |
|
476 * @param nanosTimeout max wait time |
|
477 * @return {@code true} if acquired |
|
478 */ |
|
479 private boolean doAcquireNanos(long arg, long nanosTimeout) |
|
480 throws InterruptedException { |
|
481 if (nanosTimeout <= 0L) |
|
482 return false; |
|
483 final long deadline = System.nanoTime() + nanosTimeout; |
|
484 final Node node = addWaiter(Node.EXCLUSIVE); |
|
485 try { |
|
486 for (;;) { |
|
487 final Node p = node.predecessor(); |
|
488 if (p == head && tryAcquire(arg)) { |
|
489 setHead(node); |
|
490 p.next = null; // help GC |
|
491 return true; |
|
492 } |
|
493 nanosTimeout = deadline - System.nanoTime(); |
|
494 if (nanosTimeout <= 0L) { |
|
495 cancelAcquire(node); |
|
496 return false; |
|
497 } |
|
498 if (shouldParkAfterFailedAcquire(p, node) && |
|
499 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) |
|
500 LockSupport.parkNanos(this, nanosTimeout); |
|
501 if (Thread.interrupted()) |
|
502 throw new InterruptedException(); |
|
503 } |
|
504 } catch (Throwable t) { |
|
505 cancelAcquire(node); |
|
506 throw t; |
|
507 } |
|
508 } |
|
509 |
|
510 /** |
|
511 * Acquires in shared uninterruptible mode. |
|
512 * @param arg the acquire argument |
|
513 */ |
|
514 private void doAcquireShared(long arg) { |
|
515 final Node node = addWaiter(Node.SHARED); |
|
516 boolean interrupted = false; |
|
517 try { |
|
518 for (;;) { |
|
519 final Node p = node.predecessor(); |
|
520 if (p == head) { |
|
521 long r = tryAcquireShared(arg); |
|
522 if (r >= 0) { |
|
523 setHeadAndPropagate(node, r); |
|
524 p.next = null; // help GC |
|
525 return; |
|
526 } |
|
527 } |
|
528 if (shouldParkAfterFailedAcquire(p, node)) |
|
529 interrupted |= parkAndCheckInterrupt(); |
|
530 } |
|
531 } catch (Throwable t) { |
|
532 cancelAcquire(node); |
|
533 throw t; |
|
534 } finally { |
|
535 if (interrupted) |
|
536 selfInterrupt(); |
|
537 } |
|
538 } |
|
539 |
|
540 /** |
|
541 * Acquires in shared interruptible mode. |
|
542 * @param arg the acquire argument |
|
543 */ |
|
544 private void doAcquireSharedInterruptibly(long arg) |
|
545 throws InterruptedException { |
|
546 final Node node = addWaiter(Node.SHARED); |
|
547 try { |
|
548 for (;;) { |
|
549 final Node p = node.predecessor(); |
|
550 if (p == head) { |
|
551 long r = tryAcquireShared(arg); |
|
552 if (r >= 0) { |
|
553 setHeadAndPropagate(node, r); |
|
554 p.next = null; // help GC |
|
555 return; |
|
556 } |
|
557 } |
|
558 if (shouldParkAfterFailedAcquire(p, node) && |
|
559 parkAndCheckInterrupt()) |
|
560 throw new InterruptedException(); |
|
561 } |
|
562 } catch (Throwable t) { |
|
563 cancelAcquire(node); |
|
564 throw t; |
|
565 } |
|
566 } |
|
567 |
|
568 /** |
|
569 * Acquires in shared timed mode. |
|
570 * |
|
571 * @param arg the acquire argument |
|
572 * @param nanosTimeout max wait time |
|
573 * @return {@code true} if acquired |
|
574 */ |
|
575 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) |
|
576 throws InterruptedException { |
|
577 if (nanosTimeout <= 0L) |
|
578 return false; |
|
579 final long deadline = System.nanoTime() + nanosTimeout; |
|
580 final Node node = addWaiter(Node.SHARED); |
|
581 try { |
|
582 for (;;) { |
|
583 final Node p = node.predecessor(); |
|
584 if (p == head) { |
|
585 long r = tryAcquireShared(arg); |
|
586 if (r >= 0) { |
|
587 setHeadAndPropagate(node, r); |
|
588 p.next = null; // help GC |
|
589 return true; |
|
590 } |
|
591 } |
|
592 nanosTimeout = deadline - System.nanoTime(); |
|
593 if (nanosTimeout <= 0L) { |
|
594 cancelAcquire(node); |
|
595 return false; |
|
596 } |
|
597 if (shouldParkAfterFailedAcquire(p, node) && |
|
598 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) |
|
599 LockSupport.parkNanos(this, nanosTimeout); |
|
600 if (Thread.interrupted()) |
|
601 throw new InterruptedException(); |
|
602 } |
|
603 } catch (Throwable t) { |
|
604 cancelAcquire(node); |
|
605 throw t; |
|
606 } |
|
607 } |
411 } |
608 |
412 |
609 // Main exported methods |
413 // Main exported methods |
610 |
414 |
611 /** |
415 /** |
754 * @param arg the acquire argument. This value is conveyed to |
558 * @param arg the acquire argument. This value is conveyed to |
755 * {@link #tryAcquire} but is otherwise uninterpreted and |
559 * {@link #tryAcquire} but is otherwise uninterpreted and |
756 * can represent anything you like. |
560 * can represent anything you like. |
757 */ |
561 */ |
758 public final void acquire(long arg) { |
562 public final void acquire(long arg) { |
759 if (!tryAcquire(arg) && |
563 if (!tryAcquire(arg)) |
760 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) |
564 acquire(null, arg, false, false, false, 0L); |
761 selfInterrupt(); |
|
762 } |
565 } |
763 |
566 |
764 /** |
567 /** |
765 * Acquires in exclusive mode, aborting if interrupted. |
568 * Acquires in exclusive mode, aborting if interrupted. |
766 * Implemented by first checking interrupt status, then invoking |
569 * Implemented by first checking interrupt status, then invoking |
774 * {@link #tryAcquire} but is otherwise uninterpreted and |
577 * {@link #tryAcquire} but is otherwise uninterpreted and |
775 * can represent anything you like. |
578 * can represent anything you like. |
776 * @throws InterruptedException if the current thread is interrupted |
579 * @throws InterruptedException if the current thread is interrupted |
777 */ |
580 */ |
778 public final void acquireInterruptibly(long arg) |
581 public final void acquireInterruptibly(long arg) |
779 throws InterruptedException { |
582 throws InterruptedException { |
780 if (Thread.interrupted()) |
583 if (Thread.interrupted() || |
584 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) |
|
781 throw new InterruptedException(); |
585 throw new InterruptedException(); |
782 if (!tryAcquire(arg)) |
|
783 doAcquireInterruptibly(arg); |
|
784 } |
586 } |
785 |
587 |
786 /** |
588 /** |
787 * Attempts to acquire in exclusive mode, aborting if interrupted, |
589 * Attempts to acquire in exclusive mode, aborting if interrupted, |
788 * and failing if the given timeout elapses. Implemented by first |
590 * and failing if the given timeout elapses. Implemented by first |
799 * @param nanosTimeout the maximum number of nanoseconds to wait |
601 * @param nanosTimeout the maximum number of nanoseconds to wait |
800 * @return {@code true} if acquired; {@code false} if timed out |
602 * @return {@code true} if acquired; {@code false} if timed out |
801 * @throws InterruptedException if the current thread is interrupted |
603 * @throws InterruptedException if the current thread is interrupted |
802 */ |
604 */ |
803 public final boolean tryAcquireNanos(long arg, long nanosTimeout) |
605 public final boolean tryAcquireNanos(long arg, long nanosTimeout) |
804 throws InterruptedException { |
606 throws InterruptedException { |
805 if (Thread.interrupted()) |
607 if (!Thread.interrupted()) { |
806 throw new InterruptedException(); |
608 if (tryAcquire(arg)) |
807 return tryAcquire(arg) || |
609 return true; |
808 doAcquireNanos(arg, nanosTimeout); |
610 if (nanosTimeout <= 0L) |
611 return false; |
|
612 int stat = acquire(null, arg, false, true, true, |
|
613 System.nanoTime() + nanosTimeout); |
|
614 if (stat > 0) |
|
615 return true; |
|
616 if (stat == 0) |
|
617 return false; |
|
618 } |
|
619 throw new InterruptedException(); |
|
809 } |
620 } |
810 |
621 |
811 /** |
622 /** |
812 * Releases in exclusive mode. Implemented by unblocking one or |
623 * Releases in exclusive mode. Implemented by unblocking one or |
813 * more threads if {@link #tryRelease} returns true. |
624 * more threads if {@link #tryRelease} returns true. |
818 * can represent anything you like. |
629 * can represent anything you like. |
819 * @return the value returned from {@link #tryRelease} |
630 * @return the value returned from {@link #tryRelease} |
820 */ |
631 */ |
821 public final boolean release(long arg) { |
632 public final boolean release(long arg) { |
822 if (tryRelease(arg)) { |
633 if (tryRelease(arg)) { |
823 Node h = head; |
634 signalNext(head); |
824 if (h != null && h.waitStatus != 0) |
|
825 unparkSuccessor(h); |
|
826 return true; |
635 return true; |
827 } |
636 } |
828 return false; |
637 return false; |
829 } |
638 } |
830 |
639 |
839 * {@link #tryAcquireShared} but is otherwise uninterpreted |
648 * {@link #tryAcquireShared} but is otherwise uninterpreted |
840 * and can represent anything you like. |
649 * and can represent anything you like. |
841 */ |
650 */ |
842 public final void acquireShared(long arg) { |
651 public final void acquireShared(long arg) { |
843 if (tryAcquireShared(arg) < 0) |
652 if (tryAcquireShared(arg) < 0) |
844 doAcquireShared(arg); |
653 acquire(null, arg, true, false, false, 0L); |
845 } |
654 } |
846 |
655 |
847 /** |
656 /** |
848 * Acquires in shared mode, aborting if interrupted. Implemented |
657 * Acquires in shared mode, aborting if interrupted. Implemented |
849 * by first checking interrupt status, then invoking at least once |
658 * by first checking interrupt status, then invoking at least once |
856 * otherwise uninterpreted and can represent anything |
665 * otherwise uninterpreted and can represent anything |
857 * you like. |
666 * you like. |
858 * @throws InterruptedException if the current thread is interrupted |
667 * @throws InterruptedException if the current thread is interrupted |
859 */ |
668 */ |
860 public final void acquireSharedInterruptibly(long arg) |
669 public final void acquireSharedInterruptibly(long arg) |
861 throws InterruptedException { |
670 throws InterruptedException { |
862 if (Thread.interrupted()) |
671 if (Thread.interrupted() || |
672 (tryAcquireShared(arg) < 0 && |
|
673 acquire(null, arg, true, true, false, 0L) < 0)) |
|
863 throw new InterruptedException(); |
674 throw new InterruptedException(); |
864 if (tryAcquireShared(arg) < 0) |
|
865 doAcquireSharedInterruptibly(arg); |
|
866 } |
675 } |
867 |
676 |
868 /** |
677 /** |
869 * Attempts to acquire in shared mode, aborting if interrupted, and |
678 * Attempts to acquire in shared mode, aborting if interrupted, and |
870 * failing if the given timeout elapses. Implemented by first |
679 * failing if the given timeout elapses. Implemented by first |
881 * @return {@code true} if acquired; {@code false} if timed out |
690 * @return {@code true} if acquired; {@code false} if timed out |
882 * @throws InterruptedException if the current thread is interrupted |
691 * @throws InterruptedException if the current thread is interrupted |
883 */ |
692 */ |
884 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) |
693 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) |
885 throws InterruptedException { |
694 throws InterruptedException { |
886 if (Thread.interrupted()) |
695 if (!Thread.interrupted()) { |
887 throw new InterruptedException(); |
696 if (tryAcquireShared(arg) >= 0) |
888 return tryAcquireShared(arg) >= 0 || |
697 return true; |
889 doAcquireSharedNanos(arg, nanosTimeout); |
698 if (nanosTimeout <= 0L) |
699 return false; |
|
700 int stat = acquire(null, arg, true, true, true, |
|
701 System.nanoTime() + nanosTimeout); |
|
702 if (stat > 0) |
|
703 return true; |
|
704 if (stat == 0) |
|
705 return false; |
|
706 } |
|
707 throw new InterruptedException(); |
|
890 } |
708 } |
891 |
709 |
892 /** |
710 /** |
893 * Releases in shared mode. Implemented by unblocking one or more |
711 * Releases in shared mode. Implemented by unblocking one or more |
894 * threads if {@link #tryReleaseShared} returns true. |
712 * threads if {@link #tryReleaseShared} returns true. |
898 * and can represent anything you like. |
716 * and can represent anything you like. |
899 * @return the value returned from {@link #tryReleaseShared} |
717 * @return the value returned from {@link #tryReleaseShared} |
900 */ |
718 */ |
901 public final boolean releaseShared(long arg) { |
719 public final boolean releaseShared(long arg) { |
902 if (tryReleaseShared(arg)) { |
720 if (tryReleaseShared(arg)) { |
903 doReleaseShared(); |
721 signalNext(head); |
904 return true; |
722 return true; |
905 } |
723 } |
906 return false; |
724 return false; |
907 } |
725 } |
908 |
726 |
916 * |
734 * |
917 * @return {@code true} if there may be other threads waiting to acquire |
735 * @return {@code true} if there may be other threads waiting to acquire |
918 */ |
736 */ |
919 public final boolean hasQueuedThreads() { |
737 public final boolean hasQueuedThreads() { |
920 for (Node p = tail, h = head; p != h && p != null; p = p.prev) |
738 for (Node p = tail, h = head; p != h && p != null; p = p.prev) |
921 if (p.waitStatus <= 0) |
739 if (p.status >= 0) |
922 return true; |
740 return true; |
923 return false; |
741 return false; |
924 } |
742 } |
925 |
743 |
926 /** |
744 /** |
946 * |
764 * |
947 * @return the first (longest-waiting) thread in the queue, or |
765 * @return the first (longest-waiting) thread in the queue, or |
948 * {@code null} if no threads are currently queued |
766 * {@code null} if no threads are currently queued |
949 */ |
767 */ |
950 public final Thread getFirstQueuedThread() { |
768 public final Thread getFirstQueuedThread() { |
951 // handle only fast path, else relay |
769 Thread first = null, w; Node h, s; |
952 return (head == tail) ? null : fullGetFirstQueuedThread(); |
770 if ((h = head) != null && ((s = h.next) == null || |
953 } |
771 (first = s.waiter) == null || |
954 |
772 s.prev == null)) { |
955 /** |
773 // traverse from tail on stale reads |
956 * Version of getFirstQueuedThread called when fastpath fails. |
774 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) |
957 */ |
775 if ((w = p.waiter) != null) |
958 private Thread fullGetFirstQueuedThread() { |
776 first = w; |
959 /* |
777 } |
960 * The first node is normally head.next. Try to get its |
778 return first; |
961 * thread field, ensuring consistent reads: If thread |
|
962 * field is nulled out or s.prev is no longer head, then |
|
963 * some other thread(s) concurrently performed setHead in |
|
964 * between some of our reads. We try this twice before |
|
965 * resorting to traversal. |
|
966 */ |
|
967 Node h, s; |
|
968 Thread st; |
|
969 if (((h = head) != null && (s = h.next) != null && |
|
970 s.prev == head && (st = s.thread) != null) || |
|
971 ((h = head) != null && (s = h.next) != null && |
|
972 s.prev == head && (st = s.thread) != null)) |
|
973 return st; |
|
974 |
|
975 /* |
|
976 * Head's next field might not have been set yet, or may have |
|
977 * been unset after setHead. So we must check to see if tail |
|
978 * is actually first node. If not, we continue on, safely |
|
979 * traversing from tail back to head to find first, |
|
980 * guaranteeing termination. |
|
981 */ |
|
982 |
|
983 Thread firstThread = null; |
|
984 for (Node p = tail; p != null && p != head; p = p.prev) { |
|
985 Thread t = p.thread; |
|
986 if (t != null) |
|
987 firstThread = t; |
|
988 } |
|
989 return firstThread; |
|
990 } |
779 } |
991 |
780 |
992 /** |
781 /** |
993 * Returns true if the given thread is currently queued. |
782 * Returns true if the given thread is currently queued. |
994 * |
783 * |
1001 */ |
790 */ |
1002 public final boolean isQueued(Thread thread) { |
791 public final boolean isQueued(Thread thread) { |
1003 if (thread == null) |
792 if (thread == null) |
1004 throw new NullPointerException(); |
793 throw new NullPointerException(); |
1005 for (Node p = tail; p != null; p = p.prev) |
794 for (Node p = tail; p != null; p = p.prev) |
1006 if (p.thread == thread) |
795 if (p.waiter == thread) |
1007 return true; |
796 return true; |
1008 return false; |
797 return false; |
1009 } |
798 } |
1010 |
799 |
1011 /** |
800 /** |
1017 * is not the first queued thread. Used only as a heuristic in |
806 * is not the first queued thread. Used only as a heuristic in |
1018 * ReentrantReadWriteLock. |
807 * ReentrantReadWriteLock. |
1019 */ |
808 */ |
1020 final boolean apparentlyFirstQueuedIsExclusive() { |
809 final boolean apparentlyFirstQueuedIsExclusive() { |
1021 Node h, s; |
810 Node h, s; |
1022 return (h = head) != null && |
811 return (h = head) != null && (s = h.next) != null && |
1023 (s = h.next) != null && |
812 !(s instanceof SharedNode) && s.waiter != null; |
1024 !s.isShared() && |
|
1025 s.thread != null; |
|
1026 } |
813 } |
1027 |
814 |
1028 /** |
815 /** |
1029 * Queries whether any threads have been waiting to acquire longer |
816 * Queries whether any threads have been waiting to acquire longer |
1030 * than the current thread. |
817 * than the current thread. |
1050 * (unless this is a reentrant acquire). For example, the {@code |
837 * (unless this is a reentrant acquire). For example, the {@code |
1051 * tryAcquire} method for a fair, reentrant, exclusive mode |
838 * tryAcquire} method for a fair, reentrant, exclusive mode |
1052 * synchronizer might look like this: |
839 * synchronizer might look like this: |
1053 * |
840 * |
1054 * <pre> {@code |
841 * <pre> {@code |
1055 * protected boolean tryAcquire(int arg) { |
842 * protected boolean tryAcquire(long arg) { |
1056 * if (isHeldExclusively()) { |
843 * if (isHeldExclusively()) { |
1057 * // A reentrant acquire; increment hold count |
844 * // A reentrant acquire; increment hold count |
1058 * return true; |
845 * return true; |
1059 * } else if (hasQueuedPredecessors()) { |
846 * } else if (hasQueuedPredecessors()) { |
1060 * return false; |
847 * return false; |
1067 * current thread, and {@code false} if the current thread |
854 * current thread, and {@code false} if the current thread |
1068 * is at the head of the queue or the queue is empty |
855 * is at the head of the queue or the queue is empty |
1069 * @since 1.7 |
856 * @since 1.7 |
1070 */ |
857 */ |
1071 public final boolean hasQueuedPredecessors() { |
858 public final boolean hasQueuedPredecessors() { |
1072 Node h, s; |
859 Thread first = null; Node h, s; |
1073 if ((h = head) != null) { |
860 if ((h = head) != null && ((s = h.next) == null || |
1074 if ((s = h.next) == null || s.waitStatus > 0) { |
861 (first = s.waiter) == null || |
1075 s = null; // traverse in case of concurrent cancellation |
862 s.prev == null)) |
1076 for (Node p = tail; p != h && p != null; p = p.prev) { |
863 first = getFirstQueuedThread(); // retry via getFirstQueuedThread |
1077 if (p.waitStatus <= 0) |
864 return first != null && first != Thread.currentThread(); |
1078 s = p; |
|
1079 } |
|
1080 } |
|
1081 if (s != null && s.thread != Thread.currentThread()) |
|
1082 return true; |
|
1083 } |
|
1084 return false; |
|
1085 } |
865 } |
1086 |
866 |
1087 // Instrumentation and monitoring methods |
867 // Instrumentation and monitoring methods |
1088 |
868 |
1089 /** |
869 /** |
1096 * @return the estimated number of threads waiting to acquire |
876 * @return the estimated number of threads waiting to acquire |
1097 */ |
877 */ |
1098 public final int getQueueLength() { |
878 public final int getQueueLength() { |
1099 int n = 0; |
879 int n = 0; |
1100 for (Node p = tail; p != null; p = p.prev) { |
880 for (Node p = tail; p != null; p = p.prev) { |
1101 if (p.thread != null) |
881 if (p.waiter != null) |
1102 ++n; |
882 ++n; |
1103 } |
883 } |
1104 return n; |
884 return n; |
1105 } |
885 } |
1106 |
886 |
1116 * @return the collection of threads |
896 * @return the collection of threads |
1117 */ |
897 */ |
1118 public final Collection<Thread> getQueuedThreads() { |
898 public final Collection<Thread> getQueuedThreads() { |
1119 ArrayList<Thread> list = new ArrayList<>(); |
899 ArrayList<Thread> list = new ArrayList<>(); |
1120 for (Node p = tail; p != null; p = p.prev) { |
900 for (Node p = tail; p != null; p = p.prev) { |
1121 Thread t = p.thread; |
901 Thread t = p.waiter; |
1122 if (t != null) |
902 if (t != null) |
1123 list.add(t); |
903 list.add(t); |
1124 } |
904 } |
1125 return list; |
905 return list; |
1126 } |
906 } |
1134 * @return the collection of threads |
914 * @return the collection of threads |
1135 */ |
915 */ |
1136 public final Collection<Thread> getExclusiveQueuedThreads() { |
916 public final Collection<Thread> getExclusiveQueuedThreads() { |
1137 ArrayList<Thread> list = new ArrayList<>(); |
917 ArrayList<Thread> list = new ArrayList<>(); |
1138 for (Node p = tail; p != null; p = p.prev) { |
918 for (Node p = tail; p != null; p = p.prev) { |
1139 if (!p.isShared()) { |
919 if (!(p instanceof SharedNode)) { |
1140 Thread t = p.thread; |
920 Thread t = p.waiter; |
1141 if (t != null) |
921 if (t != null) |
1142 list.add(t); |
922 list.add(t); |
1143 } |
923 } |
1144 } |
924 } |
1145 return list; |
925 return list; |
1154 * @return the collection of threads |
934 * @return the collection of threads |
1155 */ |
935 */ |
1156 public final Collection<Thread> getSharedQueuedThreads() { |
936 public final Collection<Thread> getSharedQueuedThreads() { |
1157 ArrayList<Thread> list = new ArrayList<>(); |
937 ArrayList<Thread> list = new ArrayList<>(); |
1158 for (Node p = tail; p != null; p = p.prev) { |
938 for (Node p = tail; p != null; p = p.prev) { |
1159 if (p.isShared()) { |
939 if (p instanceof SharedNode) { |
1160 Thread t = p.thread; |
940 Thread t = p.waiter; |
1161 if (t != null) |
941 if (t != null) |
1162 list.add(t); |
942 list.add(t); |
1163 } |
943 } |
1164 } |
944 } |
1165 return list; |
945 return list; |
1176 */ |
956 */ |
1177 public String toString() { |
957 public String toString() { |
1178 return super.toString() |
958 return super.toString() |
1179 + "[State = " + getState() + ", " |
959 + "[State = " + getState() + ", " |
1180 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; |
960 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; |
1181 } |
|
1182 |
|
1183 |
|
1184 // Internal support methods for Conditions |
|
1185 |
|
1186 /** |
|
1187 * Returns true if a node, always one that was initially placed on |
|
1188 * a condition queue, is now waiting to reacquire on sync queue. |
|
1189 * @param node the node |
|
1190 * @return true if is reacquiring |
|
1191 */ |
|
1192 final boolean isOnSyncQueue(Node node) { |
|
1193 if (node.waitStatus == Node.CONDITION || node.prev == null) |
|
1194 return false; |
|
1195 if (node.next != null) // If has successor, it must be on queue |
|
1196 return true; |
|
1197 /* |
|
1198 * node.prev can be non-null, but not yet on queue because |
|
1199 * the CAS to place it on queue can fail. So we have to |
|
1200 * traverse from tail to make sure it actually made it. It |
|
1201 * will always be near the tail in calls to this method, and |
|
1202 * unless the CAS failed (which is unlikely), it will be |
|
1203 * there, so we hardly ever traverse much. |
|
1204 */ |
|
1205 return findNodeFromTail(node); |
|
1206 } |
|
1207 |
|
1208 /** |
|
1209 * Returns true if node is on sync queue by searching backwards from tail. |
|
1210 * Called only when needed by isOnSyncQueue. |
|
1211 * @return true if present |
|
1212 */ |
|
1213 private boolean findNodeFromTail(Node node) { |
|
1214 // We check for node first, since it's likely to be at or near tail. |
|
1215 // tail is known to be non-null, so we could re-order to "save" |
|
1216 // one null check, but we leave it this way to help the VM. |
|
1217 for (Node p = tail;;) { |
|
1218 if (p == node) |
|
1219 return true; |
|
1220 if (p == null) |
|
1221 return false; |
|
1222 p = p.prev; |
|
1223 } |
|
1224 } |
|
1225 |
|
1226 /** |
|
1227 * Transfers a node from a condition queue onto sync queue. |
|
1228 * Returns true if successful. |
|
1229 * @param node the node |
|
1230 * @return true if successfully transferred (else the node was |
|
1231 * cancelled before signal) |
|
1232 */ |
|
1233 final boolean transferForSignal(Node node) { |
|
1234 /* |
|
1235 * If cannot change waitStatus, the node has been cancelled. |
|
1236 */ |
|
1237 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) |
|
1238 return false; |
|
1239 |
|
1240 /* |
|
1241 * Splice onto queue and try to set waitStatus of predecessor to |
|
1242 * indicate that thread is (probably) waiting. If cancelled or |
|
1243 * attempt to set waitStatus fails, wake up to resync (in which |
|
1244 * case the waitStatus can be transiently and harmlessly wrong). |
|
1245 */ |
|
1246 Node p = enq(node); |
|
1247 int ws = p.waitStatus; |
|
1248 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) |
|
1249 LockSupport.unpark(node.thread); |
|
1250 return true; |
|
1251 } |
|
1252 |
|
1253 /** |
|
1254 * Transfers node, if necessary, to sync queue after a cancelled wait. |
|
1255 * Returns true if thread was cancelled before being signalled. |
|
1256 * |
|
1257 * @param node the node |
|
1258 * @return true if cancelled before the node was signalled |
|
1259 */ |
|
1260 final boolean transferAfterCancelledWait(Node node) { |
|
1261 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { |
|
1262 enq(node); |
|
1263 return true; |
|
1264 } |
|
1265 /* |
|
1266 * If we lost out to a signal(), then we can't proceed |
|
1267 * until it finishes its enq(). Cancelling during an |
|
1268 * incomplete transfer is both rare and transient, so just |
|
1269 * spin. |
|
1270 */ |
|
1271 while (!isOnSyncQueue(node)) |
|
1272 Thread.yield(); |
|
1273 return false; |
|
1274 } |
|
1275 |
|
1276 /** |
|
1277 * Invokes release with current state value; returns saved state. |
|
1278 * Cancels node and throws exception on failure. |
|
1279 * @param node the condition node for this wait |
|
1280 * @return previous sync state |
|
1281 */ |
|
1282 final long fullyRelease(Node node) { |
|
1283 try { |
|
1284 long savedState = getState(); |
|
1285 if (release(savedState)) |
|
1286 return savedState; |
|
1287 throw new IllegalMonitorStateException(); |
|
1288 } catch (Throwable t) { |
|
1289 node.waitStatus = Node.CANCELLED; |
|
1290 throw t; |
|
1291 } |
|
1292 } |
961 } |
1293 |
962 |
1294 // Instrumentation methods for conditions |
963 // Instrumentation methods for conditions |
1295 |
964 |
1296 /** |
965 /** |
1382 * condition semantics that rely on those of the associated |
1051 * condition semantics that rely on those of the associated |
1383 * {@code AbstractQueuedLongSynchronizer}. |
1052 * {@code AbstractQueuedLongSynchronizer}. |
1384 * |
1053 * |
1385 * <p>This class is Serializable, but all fields are transient, |
1054 * <p>This class is Serializable, but all fields are transient, |
1386 * so deserialized conditions have no waiters. |
1055 * so deserialized conditions have no waiters. |
1387 * |
|
1388 * @since 1.6 |
|
1389 */ |
1056 */ |
1390 public class ConditionObject implements Condition, java.io.Serializable { |
1057 public class ConditionObject implements Condition, java.io.Serializable { |
1391 private static final long serialVersionUID = 1173984872572414699L; |
1058 private static final long serialVersionUID = 1173984872572414699L; |
1392 /** First node of condition queue. */ |
1059 /** First node of condition queue. */ |
1393 private transient Node firstWaiter; |
1060 private transient ConditionNode firstWaiter; |
1394 /** Last node of condition queue. */ |
1061 /** Last node of condition queue. */ |
1395 private transient Node lastWaiter; |
1062 private transient ConditionNode lastWaiter; |
1396 |
1063 |
1397 /** |
1064 /** |
1398 * Creates a new {@code ConditionObject} instance. |
1065 * Creates a new {@code ConditionObject} instance. |
1399 */ |
1066 */ |
1400 public ConditionObject() { } |
1067 public ConditionObject() { } |
1401 |
1068 |
1402 // Internal methods |
1069 // Signalling methods |
1403 |
1070 |
1404 /** |
1071 /** |
1405 * Adds a new waiter to wait queue. |
1072 * Removes and transfers one or all waiters to sync queue. |
1406 * @return its new wait node |
1073 */ |
1407 */ |
1074 private void doSignal(ConditionNode first, boolean all) { |
1408 private Node addConditionWaiter() { |
1075 while (first != null) { |
1409 if (!isHeldExclusively()) |
1076 ConditionNode next = first.nextWaiter; |
1410 throw new IllegalMonitorStateException(); |
1077 if ((firstWaiter = next) == null) |
1411 Node t = lastWaiter; |
|
1412 // If lastWaiter is cancelled, clean out. |
|
1413 if (t != null && t.waitStatus != Node.CONDITION) { |
|
1414 unlinkCancelledWaiters(); |
|
1415 t = lastWaiter; |
|
1416 } |
|
1417 |
|
1418 Node node = new Node(Node.CONDITION); |
|
1419 |
|
1420 if (t == null) |
|
1421 firstWaiter = node; |
|
1422 else |
|
1423 t.nextWaiter = node; |
|
1424 lastWaiter = node; |
|
1425 return node; |
|
1426 } |
|
1427 |
|
1428 /** |
|
1429 * Removes and transfers nodes until hit non-cancelled one or |
|
1430 * null. Split out from signal in part to encourage compilers |
|
1431 * to inline the case of no waiters. |
|
1432 * @param first (non-null) the first node on condition queue |
|
1433 */ |
|
1434 private void doSignal(Node first) { |
|
1435 do { |
|
1436 if ( (firstWaiter = first.nextWaiter) == null) |
|
1437 lastWaiter = null; |
1078 lastWaiter = null; |
1438 first.nextWaiter = null; |
1079 if ((first.getAndUnsetStatus(COND) & COND) != 0) { |
1439 } while (!transferForSignal(first) && |
1080 enqueue(first); |
1440 (first = firstWaiter) != null); |
1081 if (!all) |
1441 } |
1082 break; |
1442 |
1083 } |
1443 /** |
|
1444 * Removes and transfers all nodes. |
|
1445 * @param first (non-null) the first node on condition queue |
|
1446 */ |
|
1447 private void doSignalAll(Node first) { |
|
1448 lastWaiter = firstWaiter = null; |
|
1449 do { |
|
1450 Node next = first.nextWaiter; |
|
1451 first.nextWaiter = null; |
|
1452 transferForSignal(first); |
|
1453 first = next; |
1084 first = next; |
1454 } while (first != null); |
1085 } |
1455 } |
1086 } |
1456 |
|
1457 /** |
|
1458 * Unlinks cancelled waiter nodes from condition queue. |
|
1459 * Called only while holding lock. This is called when |
|
1460 * cancellation occurred during condition wait, and upon |
|
1461 * insertion of a new waiter when lastWaiter is seen to have |
|
1462 * been cancelled. This method is needed to avoid garbage |
|
1463 * retention in the absence of signals. So even though it may |
|
1464 * require a full traversal, it comes into play only when |
|
1465 * timeouts or cancellations occur in the absence of |
|
1466 * signals. It traverses all nodes rather than stopping at a |
|
1467 * particular target to unlink all pointers to garbage nodes |
|
1468 * without requiring many re-traversals during cancellation |
|
1469 * storms. |
|
1470 */ |
|
1471 private void unlinkCancelledWaiters() { |
|
1472 Node t = firstWaiter; |
|
1473 Node trail = null; |
|
1474 while (t != null) { |
|
1475 Node next = t.nextWaiter; |
|
1476 if (t.waitStatus != Node.CONDITION) { |
|
1477 t.nextWaiter = null; |
|
1478 if (trail == null) |
|
1479 firstWaiter = next; |
|
1480 else |
|
1481 trail.nextWaiter = next; |
|
1482 if (next == null) |
|
1483 lastWaiter = trail; |
|
1484 } |
|
1485 else |
|
1486 trail = t; |
|
1487 t = next; |
|
1488 } |
|
1489 } |
|
1490 |
|
1491 // public methods |
|
1492 |
1087 |
1493 /** |
1088 /** |
1494 * Moves the longest-waiting thread, if one exists, from the |
1089 * Moves the longest-waiting thread, if one exists, from the |
1495 * wait queue for this condition to the wait queue for the |
1090 * wait queue for this condition to the wait queue for the |
1496 * owning lock. |
1091 * owning lock. |
1497 * |
1092 * |
1498 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} |
1093 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} |
1499 * returns {@code false} |
1094 * returns {@code false} |
1500 */ |
1095 */ |
1501 public final void signal() { |
1096 public final void signal() { |
1097 ConditionNode first = firstWaiter; |
|
1502 if (!isHeldExclusively()) |
1098 if (!isHeldExclusively()) |
1503 throw new IllegalMonitorStateException(); |
1099 throw new IllegalMonitorStateException(); |
1504 Node first = firstWaiter; |
|
1505 if (first != null) |
1100 if (first != null) |
1506 doSignal(first); |
1101 doSignal(first, false); |
1507 } |
1102 } |
1508 |
1103 |
1509 /** |
1104 /** |
1510 * Moves all threads from the wait queue for this condition to |
1105 * Moves all threads from the wait queue for this condition to |
1511 * the wait queue for the owning lock. |
1106 * the wait queue for the owning lock. |
1512 * |
1107 * |
1513 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} |
1108 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} |
1514 * returns {@code false} |
1109 * returns {@code false} |
1515 */ |
1110 */ |
1516 public final void signalAll() { |
1111 public final void signalAll() { |
1112 ConditionNode first = firstWaiter; |
|
1517 if (!isHeldExclusively()) |
1113 if (!isHeldExclusively()) |
1518 throw new IllegalMonitorStateException(); |
1114 throw new IllegalMonitorStateException(); |
1519 Node first = firstWaiter; |
|
1520 if (first != null) |
1115 if (first != null) |
1521 doSignalAll(first); |
1116 doSignal(first, true); |
1117 } |
|
1118 |
|
1119 // Waiting methods |
|
1120 |
|
1121 /** |
|
1122 * Adds node to condition list and releases lock. |
|
1123 * |
|
1124 * @param node the node |
|
1125 * @return savedState to reacquire after wait |
|
1126 */ |
|
1127 private long enableWait(ConditionNode node) { |
|
1128 if (isHeldExclusively()) { |
|
1129 node.waiter = Thread.currentThread(); |
|
1130 node.setStatusRelaxed(COND | WAITING); |
|
1131 ConditionNode last = lastWaiter; |
|
1132 if (last == null) |
|
1133 firstWaiter = node; |
|
1134 else |
|
1135 last.nextWaiter = node; |
|
1136 lastWaiter = node; |
|
1137 long savedState = getState(); |
|
1138 if (release(savedState)) |
|
1139 return savedState; |
|
1140 } |
|
1141 node.status = CANCELLED; // lock not held or inconsistent |
|
1142 throw new IllegalMonitorStateException(); |
|
1143 } |
|
1144 |
|
1145 /** |
|
1146 * Returns true if a node that was initially placed on a condition |
|
1147 * queue is now ready to reacquire on sync queue. |
|
1148 * @param node the node |
|
1149 * @return true if is reacquiring |
|
1150 */ |
|
1151 private boolean canReacquire(ConditionNode node) { |
|
1152 // check links, not status to avoid enqueue race |
|
1153 return node != null && node.prev != null && isEnqueued(node); |
|
1154 } |
|
1155 |
|
1156 /** |
|
1157 * Unlinks the given node and other non-waiting nodes from |
|
1158 * condition queue unless already unlinked. |
|
1159 */ |
|
1160 private void unlinkCancelledWaiters(ConditionNode node) { |
|
1161 if (node == null || node.nextWaiter != null || node == lastWaiter) { |
|
1162 ConditionNode w = firstWaiter, trail = null; |
|
1163 while (w != null) { |
|
1164 ConditionNode next = w.nextWaiter; |
|
1165 if ((w.status & COND) == 0) { |
|
1166 w.nextWaiter = null; |
|
1167 if (trail == null) |
|
1168 firstWaiter = next; |
|
1169 else |
|
1170 trail.nextWaiter = next; |
|
1171 if (next == null) |
|
1172 lastWaiter = trail; |
|
1173 } else |
|
1174 trail = w; |
|
1175 w = next; |
|
1176 } |
|
1177 } |
|
1522 } |
1178 } |
1523 |
1179 |
1524 /** |
1180 /** |
1525 * Implements uninterruptible condition wait. |
1181 * Implements uninterruptible condition wait. |
1526 * <ol> |
1182 * <ol> |
1531 * <li>Reacquire by invoking specialized version of |
1187 * <li>Reacquire by invoking specialized version of |
1532 * {@link #acquire} with saved state as argument. |
1188 * {@link #acquire} with saved state as argument. |
1533 * </ol> |
1189 * </ol> |
1534 */ |
1190 */ |
1535 public final void awaitUninterruptibly() { |
1191 public final void awaitUninterruptibly() { |
1536 Node node = addConditionWaiter(); |
1192 ConditionNode node = new ConditionNode(); |
1537 long savedState = fullyRelease(node); |
1193 long savedState = enableWait(node); |
1194 LockSupport.setCurrentBlocker(this); // for back-compatibility |
|
1538 boolean interrupted = false; |
1195 boolean interrupted = false; |
1539 while (!isOnSyncQueue(node)) { |
1196 while (!canReacquire(node)) { |
1540 LockSupport.park(this); |
|
1541 if (Thread.interrupted()) |
1197 if (Thread.interrupted()) |
1542 interrupted = true; |
1198 interrupted = true; |
1543 } |
1199 else if ((node.status & COND) != 0) { |
1544 if (acquireQueued(node, savedState) || interrupted) |
1200 try { |
1545 selfInterrupt(); |
1201 ForkJoinPool.managedBlock(node); |
1546 } |
1202 } catch (InterruptedException ie) { |
1547 |
1203 interrupted = true; |
1548 /* |
1204 } |
1549 * For interruptible waits, we need to track whether to throw |
1205 } else |
1550 * InterruptedException, if interrupted while blocked on |
1206 Thread.onSpinWait(); // awoke while enqueuing |
1551 * condition, versus reinterrupt current thread, if |
1207 } |
1552 * interrupted while blocked waiting to re-acquire. |
1208 LockSupport.setCurrentBlocker(null); |
1553 */ |
1209 node.clearStatus(); |
1554 |
1210 acquire(node, savedState, false, false, false, 0L); |
1555 /** Mode meaning to reinterrupt on exit from wait */ |
1211 if (interrupted) |
1556 private static final int REINTERRUPT = 1; |
1212 Thread.currentThread().interrupt(); |
1557 /** Mode meaning to throw InterruptedException on exit from wait */ |
|
1558 private static final int THROW_IE = -1; |
|
1559 |
|
1560 /** |
|
1561 * Checks for interrupt, returning THROW_IE if interrupted |
|
1562 * before signalled, REINTERRUPT if after signalled, or |
|
1563 * 0 if not interrupted. |
|
1564 */ |
|
1565 private int checkInterruptWhileWaiting(Node node) { |
|
1566 return Thread.interrupted() ? |
|
1567 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : |
|
1568 0; |
|
1569 } |
|
1570 |
|
1571 /** |
|
1572 * Throws InterruptedException, reinterrupts current thread, or |
|
1573 * does nothing, depending on mode. |
|
1574 */ |
|
1575 private void reportInterruptAfterWait(int interruptMode) |
|
1576 throws InterruptedException { |
|
1577 if (interruptMode == THROW_IE) |
|
1578 throw new InterruptedException(); |
|
1579 else if (interruptMode == REINTERRUPT) |
|
1580 selfInterrupt(); |
|
1581 } |
1213 } |
1582 |
1214 |
1583 /** |
1215 /** |
1584 * Implements interruptible condition wait. |
1216 * Implements interruptible condition wait. |
1585 * <ol> |
1217 * <ol> |
1594 * </ol> |
1226 * </ol> |
1595 */ |
1227 */ |
1596 public final void await() throws InterruptedException { |
1228 public final void await() throws InterruptedException { |
1597 if (Thread.interrupted()) |
1229 if (Thread.interrupted()) |
1598 throw new InterruptedException(); |
1230 throw new InterruptedException(); |
1599 Node node = addConditionWaiter(); |
1231 ConditionNode node = new ConditionNode(); |
1600 long savedState = fullyRelease(node); |
1232 long savedState = enableWait(node); |
1601 int interruptMode = 0; |
1233 LockSupport.setCurrentBlocker(this); // for back-compatibility |
1602 while (!isOnSyncQueue(node)) { |
1234 boolean interrupted = false, cancelled = false; |
1603 LockSupport.park(this); |
1235 while (!canReacquire(node)) { |
1604 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) |
1236 if (interrupted |= Thread.interrupted()) { |
1605 break; |
1237 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) |
1606 } |
1238 break; // else interrupted after signal |
1607 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
1239 } else if ((node.status & COND) != 0) { |
1608 interruptMode = REINTERRUPT; |
1240 try { |
1609 if (node.nextWaiter != null) // clean up if cancelled |
1241 ForkJoinPool.managedBlock(node); |
1610 unlinkCancelledWaiters(); |
1242 } catch (InterruptedException ie) { |
1611 if (interruptMode != 0) |
1243 interrupted = true; |
1612 reportInterruptAfterWait(interruptMode); |
1244 } |
1245 } else |
|
1246 Thread.onSpinWait(); // awoke while enqueuing |
|
1247 } |
|
1248 LockSupport.setCurrentBlocker(null); |
|
1249 node.clearStatus(); |
|
1250 acquire(node, savedState, false, false, false, 0L); |
|
1251 if (interrupted) { |
|
1252 if (cancelled) { |
|
1253 unlinkCancelledWaiters(node); |
|
1254 throw new InterruptedException(); |
|
1255 } |
|
1256 Thread.currentThread().interrupt(); |
|
1257 } |
|
1613 } |
1258 } |
1614 |
1259 |
1615 /** |
1260 /** |
1616 * Implements timed condition wait. |
1261 * Implements timed condition wait. |
1617 * <ol> |
1262 * <ol> |
1627 */ |
1272 */ |
1628 public final long awaitNanos(long nanosTimeout) |
1273 public final long awaitNanos(long nanosTimeout) |
1629 throws InterruptedException { |
1274 throws InterruptedException { |
1630 if (Thread.interrupted()) |
1275 if (Thread.interrupted()) |
1631 throw new InterruptedException(); |
1276 throw new InterruptedException(); |
1632 // We don't check for nanosTimeout <= 0L here, to allow |
1277 ConditionNode node = new ConditionNode(); |
1633 // awaitNanos(0) as a way to "yield the lock". |
1278 long savedState = enableWait(node); |
1634 final long deadline = System.nanoTime() + nanosTimeout; |
1279 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; |
1635 long initialNanos = nanosTimeout; |
1280 long deadline = System.nanoTime() + nanos; |
1636 Node node = addConditionWaiter(); |
1281 boolean cancelled = false, interrupted = false; |
1637 long savedState = fullyRelease(node); |
1282 while (!canReacquire(node)) { |
1638 int interruptMode = 0; |
1283 if ((interrupted |= Thread.interrupted()) || |
1639 while (!isOnSyncQueue(node)) { |
1284 (nanos = deadline - System.nanoTime()) <= 0L) { |
1640 if (nanosTimeout <= 0L) { |
1285 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) |
1641 transferAfterCancelledWait(node); |
1286 break; |
1642 break; |
1287 } else |
1643 } |
1288 LockSupport.parkNanos(this, nanos); |
1644 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) |
1289 } |
1645 LockSupport.parkNanos(this, nanosTimeout); |
1290 node.clearStatus(); |
1646 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) |
1291 acquire(node, savedState, false, false, false, 0L); |
1647 break; |
1292 if (cancelled) { |
1648 nanosTimeout = deadline - System.nanoTime(); |
1293 unlinkCancelledWaiters(node); |
1649 } |
1294 if (interrupted) |
1650 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
1295 throw new InterruptedException(); |
1651 interruptMode = REINTERRUPT; |
1296 } else if (interrupted) |
1652 if (node.nextWaiter != null) |
1297 Thread.currentThread().interrupt(); |
1653 unlinkCancelledWaiters(); |
|
1654 if (interruptMode != 0) |
|
1655 reportInterruptAfterWait(interruptMode); |
|
1656 long remaining = deadline - System.nanoTime(); // avoid overflow |
1298 long remaining = deadline - System.nanoTime(); // avoid overflow |
1657 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; |
1299 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; |
1658 } |
1300 } |
1659 |
1301 |
1660 /** |
1302 /** |
1661 * Implements absolute timed condition wait. |
1303 * Implements absolute timed condition wait. |
1662 * <ol> |
1304 * <ol> |
1674 public final boolean awaitUntil(Date deadline) |
1316 public final boolean awaitUntil(Date deadline) |
1675 throws InterruptedException { |
1317 throws InterruptedException { |
1676 long abstime = deadline.getTime(); |
1318 long abstime = deadline.getTime(); |
1677 if (Thread.interrupted()) |
1319 if (Thread.interrupted()) |
1678 throw new InterruptedException(); |
1320 throw new InterruptedException(); |
1679 Node node = addConditionWaiter(); |
1321 ConditionNode node = new ConditionNode(); |
1680 long savedState = fullyRelease(node); |
1322 long savedState = enableWait(node); |
1681 boolean timedout = false; |
1323 boolean cancelled = false, interrupted = false; |
1682 int interruptMode = 0; |
1324 while (!canReacquire(node)) { |
1683 while (!isOnSyncQueue(node)) { |
1325 if ((interrupted |= Thread.interrupted()) || |
1684 if (System.currentTimeMillis() >= abstime) { |
1326 System.currentTimeMillis() >= abstime) { |
1685 timedout = transferAfterCancelledWait(node); |
1327 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) |
1686 break; |
1328 break; |
1687 } |
1329 } else |
1688 LockSupport.parkUntil(this, abstime); |
1330 LockSupport.parkUntil(this, abstime); |
1689 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) |
1331 } |
1690 break; |
1332 node.clearStatus(); |
1691 } |
1333 acquire(node, savedState, false, false, false, 0L); |
1692 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
1334 if (cancelled) { |
1693 interruptMode = REINTERRUPT; |
1335 unlinkCancelledWaiters(node); |
1694 if (node.nextWaiter != null) |
1336 if (interrupted) |
1695 unlinkCancelledWaiters(); |
1337 throw new InterruptedException(); |
1696 if (interruptMode != 0) |
1338 } else if (interrupted) |
1697 reportInterruptAfterWait(interruptMode); |
1339 Thread.currentThread().interrupt(); |
1698 return !timedout; |
1340 return !cancelled; |
1699 } |
1341 } |
1700 |
1342 |
1701 /** |
1343 /** |
1702 * Implements timed condition wait. |
1344 * Implements timed condition wait. |
1703 * <ol> |
1345 * <ol> |
1715 public final boolean await(long time, TimeUnit unit) |
1357 public final boolean await(long time, TimeUnit unit) |
1716 throws InterruptedException { |
1358 throws InterruptedException { |
1717 long nanosTimeout = unit.toNanos(time); |
1359 long nanosTimeout = unit.toNanos(time); |
1718 if (Thread.interrupted()) |
1360 if (Thread.interrupted()) |
1719 throw new InterruptedException(); |
1361 throw new InterruptedException(); |
1720 // We don't check for nanosTimeout <= 0L here, to allow |
1362 ConditionNode node = new ConditionNode(); |
1721 // await(0, unit) as a way to "yield the lock". |
1363 long savedState = enableWait(node); |
1722 final long deadline = System.nanoTime() + nanosTimeout; |
1364 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; |
1723 Node node = addConditionWaiter(); |
1365 long deadline = System.nanoTime() + nanos; |
1724 long savedState = fullyRelease(node); |
1366 boolean cancelled = false, interrupted = false; |
1725 boolean timedout = false; |
1367 while (!canReacquire(node)) { |
1726 int interruptMode = 0; |
1368 if ((interrupted |= Thread.interrupted()) || |
1727 while (!isOnSyncQueue(node)) { |
1369 (nanos = deadline - System.nanoTime()) <= 0L) { |
1728 if (nanosTimeout <= 0L) { |
1370 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) |
1729 timedout = transferAfterCancelledWait(node); |
1371 break; |
1730 break; |
1372 } else |
1731 } |
1373 LockSupport.parkNanos(this, nanos); |
1732 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) |
1374 } |
1733 LockSupport.parkNanos(this, nanosTimeout); |
1375 node.clearStatus(); |
1734 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) |
1376 acquire(node, savedState, false, false, false, 0L); |
1735 break; |
1377 if (cancelled) { |
1736 nanosTimeout = deadline - System.nanoTime(); |
1378 unlinkCancelledWaiters(node); |
1737 } |
1379 if (interrupted) |
1738 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
1380 throw new InterruptedException(); |
1739 interruptMode = REINTERRUPT; |
1381 } else if (interrupted) |
1740 if (node.nextWaiter != null) |
1382 Thread.currentThread().interrupt(); |
1741 unlinkCancelledWaiters(); |
1383 return !cancelled; |
1742 if (interruptMode != 0) |
|
1743 reportInterruptAfterWait(interruptMode); |
|
1744 return !timedout; |
|
1745 } |
1384 } |
1746 |
1385 |
1747 // support for instrumentation |
1386 // support for instrumentation |
1748 |
1387 |
1749 /** |
1388 /** |
1765 * returns {@code false} |
1404 * returns {@code false} |
1766 */ |
1405 */ |
1767 protected final boolean hasWaiters() { |
1406 protected final boolean hasWaiters() { |
1768 if (!isHeldExclusively()) |
1407 if (!isHeldExclusively()) |
1769 throw new IllegalMonitorStateException(); |
1408 throw new IllegalMonitorStateException(); |
1770 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { |
1409 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { |
1771 if (w.waitStatus == Node.CONDITION) |
1410 if ((w.status & COND) != 0) |
1772 return true; |
1411 return true; |
1773 } |
1412 } |
1774 return false; |
1413 return false; |
1775 } |
1414 } |
1776 |
1415 |
1785 */ |
1424 */ |
1786 protected final int getWaitQueueLength() { |
1425 protected final int getWaitQueueLength() { |
1787 if (!isHeldExclusively()) |
1426 if (!isHeldExclusively()) |
1788 throw new IllegalMonitorStateException(); |
1427 throw new IllegalMonitorStateException(); |
1789 int n = 0; |
1428 int n = 0; |
1790 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { |
1429 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { |
1791 if (w.waitStatus == Node.CONDITION) |
1430 if ((w.status & COND) != 0) |
1792 ++n; |
1431 ++n; |
1793 } |
1432 } |
1794 return n; |
1433 return n; |
1795 } |
1434 } |
1796 |
1435 |
1805 */ |
1444 */ |
1806 protected final Collection<Thread> getWaitingThreads() { |
1445 protected final Collection<Thread> getWaitingThreads() { |
1807 if (!isHeldExclusively()) |
1446 if (!isHeldExclusively()) |
1808 throw new IllegalMonitorStateException(); |
1447 throw new IllegalMonitorStateException(); |
1809 ArrayList<Thread> list = new ArrayList<>(); |
1448 ArrayList<Thread> list = new ArrayList<>(); |
1810 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { |
1449 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { |
1811 if (w.waitStatus == Node.CONDITION) { |
1450 if ((w.status & COND) != 0) { |
1812 Thread t = w.thread; |
1451 Thread t = w.waiter; |
1813 if (t != null) |
1452 if (t != null) |
1814 list.add(t); |
1453 list.add(t); |
1815 } |
1454 } |
1816 } |
1455 } |
1817 return list; |
1456 return list; |
1818 } |
1457 } |
1819 } |
1458 } |
1820 |
1459 |
1821 // VarHandle mechanics |
1460 // Unsafe |
1822 private static final VarHandle STATE; |
1461 private static final Unsafe U = Unsafe.getUnsafe(); |
1823 private static final VarHandle HEAD; |
1462 private static final long STATE |
1824 private static final VarHandle TAIL; |
1463 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); |
1464 private static final long HEAD |
|
1465 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); |
|
1466 private static final long TAIL |
|
1467 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); |
|
1825 |
1468 |
1826 static { |
1469 static { |
1827 try { |
|
1828 MethodHandles.Lookup l = MethodHandles.lookup(); |
|
1829 STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class); |
|
1830 HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class); |
|
1831 TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class); |
|
1832 } catch (ReflectiveOperationException e) { |
|
1833 throw new ExceptionInInitializerError(e); |
|
1834 } |
|
1835 |
|
1836 // Reduce the risk of rare disastrous classloading in first call to |
|
1837 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 |
|
1838 Class<?> ensureLoaded = LockSupport.class; |
1470 Class<?> ensureLoaded = LockSupport.class; |
1839 } |
1471 } |
1840 |
|
1841 /** |
|
1842 * Initializes head and tail fields on first contention. |
|
1843 */ |
|
1844 private final void initializeSyncQueue() { |
|
1845 Node h; |
|
1846 if (HEAD.compareAndSet(this, null, (h = new Node()))) |
|
1847 tail = h; |
|
1848 } |
|
1849 |
|
1850 /** |
|
1851 * CASes tail field. |
|
1852 */ |
|
1853 private final boolean compareAndSetTail(Node expect, Node update) { |
|
1854 return TAIL.compareAndSet(this, expect, update); |
|
1855 } |
|
1856 } |
1472 } |