|
1 /* |
|
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
3 * |
|
4 * This code is free software; you can redistribute it and/or modify it |
|
5 * under the terms of the GNU General Public License version 2 only, as |
|
6 * published by the Free Software Foundation. Sun designates this |
|
7 * particular file as subject to the "Classpath" exception as provided |
|
8 * by Sun in the LICENSE file that accompanied this code. |
|
9 * |
|
10 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
13 * version 2 for more details (a copy is included in the LICENSE file that |
|
14 * accompanied this code). |
|
15 * |
|
16 * You should have received a copy of the GNU General Public License version |
|
17 * 2 along with this work; if not, write to the Free Software Foundation, |
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
19 * |
|
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
|
21 * CA 95054 USA or visit www.sun.com if you need additional information or |
|
22 * have any questions. |
|
23 */ |
|
24 |
|
25 /* |
|
26 * This file is available under and governed by the GNU General Public |
|
27 * License version 2 only, as published by the Free Software Foundation. |
|
28 * However, the following notice accompanied the original version of this |
|
29 * file: |
|
30 * |
|
31 * Written by Doug Lea with assistance from members of JCP JSR-166 |
|
32 * Expert Group and released to the public domain, as explained at |
|
33 * http://creativecommons.org/licenses/publicdomain |
|
34 */ |
|
35 |
|
36 package java.util.concurrent; |
|
37 import java.util.*; |
|
38 import java.util.concurrent.atomic.*; |
|
39 |
|
40 |
|
41 /** |
|
42 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. |
|
43 * This queue orders elements FIFO (first-in-first-out). |
|
44 * The <em>head</em> of the queue is that element that has been on the |
|
45 * queue the longest time. |
|
46 * The <em>tail</em> of the queue is that element that has been on the |
|
47 * queue the shortest time. New elements |
|
48 * are inserted at the tail of the queue, and the queue retrieval |
|
49 * operations obtain elements at the head of the queue. |
|
50 * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when |
|
51 * many threads will share access to a common collection. |
|
52 * This queue does not permit <tt>null</tt> elements. |
|
53 * |
|
54 * <p>This implementation employs an efficient "wait-free" |
|
55 * algorithm based on one described in <a |
|
56 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple, |
|
57 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue |
|
58 * Algorithms</a> by Maged M. Michael and Michael L. Scott. |
|
59 * |
|
60 * <p>Beware that, unlike in most collections, the <tt>size</tt> method |
|
61 * is <em>NOT</em> a constant-time operation. Because of the |
|
62 * asynchronous nature of these queues, determining the current number |
|
63 * of elements requires a traversal of the elements. |
|
64 * |
|
65 * <p>This class and its iterator implement all of the |
|
66 * <em>optional</em> methods of the {@link Collection} and {@link |
|
67 * Iterator} interfaces. |
|
68 * |
|
69 * <p>Memory consistency effects: As with other concurrent |
|
70 * collections, actions in a thread prior to placing an object into a |
|
71 * {@code ConcurrentLinkedQueue} |
|
72 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
|
73 * actions subsequent to the access or removal of that element from |
|
74 * the {@code ConcurrentLinkedQueue} in another thread. |
|
75 * |
|
76 * <p>This class is a member of the |
|
77 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> |
|
78 * Java Collections Framework</a>. |
|
79 * |
|
80 * @since 1.5 |
|
81 * @author Doug Lea |
|
82 * @param <E> the type of elements held in this collection |
|
83 * |
|
84 */ |
|
85 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> |
|
86 implements Queue<E>, java.io.Serializable { |
|
87 private static final long serialVersionUID = 196745693267521676L; |
|
88 |
|
89 /* |
|
90 * This is a straight adaptation of Michael & Scott algorithm. |
|
91 * For explanation, read the paper. The only (minor) algorithmic |
|
92 * difference is that this version supports lazy deletion of |
|
93 * internal nodes (method remove(Object)) -- remove CAS'es item |
|
94 * fields to null. The normal queue operations unlink but then |
|
95 * pass over nodes with null item fields. Similarly, iteration |
|
96 * methods ignore those with nulls. |
|
97 * |
|
98 * Also note that like most non-blocking algorithms in this |
|
99 * package, this implementation relies on the fact that in garbage |
|
100 * collected systems, there is no possibility of ABA problems due |
|
101 * to recycled nodes, so there is no need to use "counted |
|
102 * pointers" or related techniques seen in versions used in |
|
103 * non-GC'ed settings. |
|
104 */ |
|
105 |
|
106 private static class Node<E> { |
|
107 private volatile E item; |
|
108 private volatile Node<E> next; |
|
109 |
|
110 private static final |
|
111 AtomicReferenceFieldUpdater<Node, Node> |
|
112 nextUpdater = |
|
113 AtomicReferenceFieldUpdater.newUpdater |
|
114 (Node.class, Node.class, "next"); |
|
115 private static final |
|
116 AtomicReferenceFieldUpdater<Node, Object> |
|
117 itemUpdater = |
|
118 AtomicReferenceFieldUpdater.newUpdater |
|
119 (Node.class, Object.class, "item"); |
|
120 |
|
121 Node(E x) { item = x; } |
|
122 |
|
123 Node(E x, Node<E> n) { item = x; next = n; } |
|
124 |
|
125 E getItem() { |
|
126 return item; |
|
127 } |
|
128 |
|
129 boolean casItem(E cmp, E val) { |
|
130 return itemUpdater.compareAndSet(this, cmp, val); |
|
131 } |
|
132 |
|
133 void setItem(E val) { |
|
134 itemUpdater.set(this, val); |
|
135 } |
|
136 |
|
137 Node<E> getNext() { |
|
138 return next; |
|
139 } |
|
140 |
|
141 boolean casNext(Node<E> cmp, Node<E> val) { |
|
142 return nextUpdater.compareAndSet(this, cmp, val); |
|
143 } |
|
144 |
|
145 void setNext(Node<E> val) { |
|
146 nextUpdater.set(this, val); |
|
147 } |
|
148 |
|
149 } |
|
150 |
|
151 private static final |
|
152 AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> |
|
153 tailUpdater = |
|
154 AtomicReferenceFieldUpdater.newUpdater |
|
155 (ConcurrentLinkedQueue.class, Node.class, "tail"); |
|
156 private static final |
|
157 AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> |
|
158 headUpdater = |
|
159 AtomicReferenceFieldUpdater.newUpdater |
|
160 (ConcurrentLinkedQueue.class, Node.class, "head"); |
|
161 |
|
162 private boolean casTail(Node<E> cmp, Node<E> val) { |
|
163 return tailUpdater.compareAndSet(this, cmp, val); |
|
164 } |
|
165 |
|
166 private boolean casHead(Node<E> cmp, Node<E> val) { |
|
167 return headUpdater.compareAndSet(this, cmp, val); |
|
168 } |
|
169 |
|
170 |
|
171 /** |
|
172 * Pointer to header node, initialized to a dummy node. The first |
|
173 * actual node is at head.getNext(). |
|
174 */ |
|
175 private transient volatile Node<E> head = new Node<E>(null, null); |
|
176 |
|
177 /** Pointer to last node on list **/ |
|
178 private transient volatile Node<E> tail = head; |
|
179 |
|
180 |
|
181 /** |
|
182 * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty. |
|
183 */ |
|
184 public ConcurrentLinkedQueue() {} |
|
185 |
|
186 /** |
|
187 * Creates a <tt>ConcurrentLinkedQueue</tt> |
|
188 * initially containing the elements of the given collection, |
|
189 * added in traversal order of the collection's iterator. |
|
190 * @param c the collection of elements to initially contain |
|
191 * @throws NullPointerException if the specified collection or any |
|
192 * of its elements are null |
|
193 */ |
|
194 public ConcurrentLinkedQueue(Collection<? extends E> c) { |
|
195 for (Iterator<? extends E> it = c.iterator(); it.hasNext();) |
|
196 add(it.next()); |
|
197 } |
|
198 |
|
199 // Have to override just to update the javadoc |
|
200 |
|
201 /** |
|
202 * Inserts the specified element at the tail of this queue. |
|
203 * |
|
204 * @return <tt>true</tt> (as specified by {@link Collection#add}) |
|
205 * @throws NullPointerException if the specified element is null |
|
206 */ |
|
207 public boolean add(E e) { |
|
208 return offer(e); |
|
209 } |
|
210 |
|
211 /** |
|
212 * Inserts the specified element at the tail of this queue. |
|
213 * |
|
214 * @return <tt>true</tt> (as specified by {@link Queue#offer}) |
|
215 * @throws NullPointerException if the specified element is null |
|
216 */ |
|
217 public boolean offer(E e) { |
|
218 if (e == null) throw new NullPointerException(); |
|
219 Node<E> n = new Node<E>(e, null); |
|
220 for (;;) { |
|
221 Node<E> t = tail; |
|
222 Node<E> s = t.getNext(); |
|
223 if (t == tail) { |
|
224 if (s == null) { |
|
225 if (t.casNext(s, n)) { |
|
226 casTail(t, n); |
|
227 return true; |
|
228 } |
|
229 } else { |
|
230 casTail(t, s); |
|
231 } |
|
232 } |
|
233 } |
|
234 } |
|
235 |
|
236 public E poll() { |
|
237 for (;;) { |
|
238 Node<E> h = head; |
|
239 Node<E> t = tail; |
|
240 Node<E> first = h.getNext(); |
|
241 if (h == head) { |
|
242 if (h == t) { |
|
243 if (first == null) |
|
244 return null; |
|
245 else |
|
246 casTail(t, first); |
|
247 } else if (casHead(h, first)) { |
|
248 E item = first.getItem(); |
|
249 if (item != null) { |
|
250 first.setItem(null); |
|
251 return item; |
|
252 } |
|
253 // else skip over deleted item, continue loop, |
|
254 } |
|
255 } |
|
256 } |
|
257 } |
|
258 |
|
259 public E peek() { // same as poll except don't remove item |
|
260 for (;;) { |
|
261 Node<E> h = head; |
|
262 Node<E> t = tail; |
|
263 Node<E> first = h.getNext(); |
|
264 if (h == head) { |
|
265 if (h == t) { |
|
266 if (first == null) |
|
267 return null; |
|
268 else |
|
269 casTail(t, first); |
|
270 } else { |
|
271 E item = first.getItem(); |
|
272 if (item != null) |
|
273 return item; |
|
274 else // remove deleted node and continue |
|
275 casHead(h, first); |
|
276 } |
|
277 } |
|
278 } |
|
279 } |
|
280 |
|
281 /** |
|
282 * Returns the first actual (non-header) node on list. This is yet |
|
283 * another variant of poll/peek; here returning out the first |
|
284 * node, not element (so we cannot collapse with peek() without |
|
285 * introducing race.) |
|
286 */ |
|
287 Node<E> first() { |
|
288 for (;;) { |
|
289 Node<E> h = head; |
|
290 Node<E> t = tail; |
|
291 Node<E> first = h.getNext(); |
|
292 if (h == head) { |
|
293 if (h == t) { |
|
294 if (first == null) |
|
295 return null; |
|
296 else |
|
297 casTail(t, first); |
|
298 } else { |
|
299 if (first.getItem() != null) |
|
300 return first; |
|
301 else // remove deleted node and continue |
|
302 casHead(h, first); |
|
303 } |
|
304 } |
|
305 } |
|
306 } |
|
307 |
|
308 |
|
309 /** |
|
310 * Returns <tt>true</tt> if this queue contains no elements. |
|
311 * |
|
312 * @return <tt>true</tt> if this queue contains no elements |
|
313 */ |
|
314 public boolean isEmpty() { |
|
315 return first() == null; |
|
316 } |
|
317 |
|
318 /** |
|
319 * Returns the number of elements in this queue. If this queue |
|
320 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns |
|
321 * <tt>Integer.MAX_VALUE</tt>. |
|
322 * |
|
323 * <p>Beware that, unlike in most collections, this method is |
|
324 * <em>NOT</em> a constant-time operation. Because of the |
|
325 * asynchronous nature of these queues, determining the current |
|
326 * number of elements requires an O(n) traversal. |
|
327 * |
|
328 * @return the number of elements in this queue |
|
329 */ |
|
330 public int size() { |
|
331 int count = 0; |
|
332 for (Node<E> p = first(); p != null; p = p.getNext()) { |
|
333 if (p.getItem() != null) { |
|
334 // Collections.size() spec says to max out |
|
335 if (++count == Integer.MAX_VALUE) |
|
336 break; |
|
337 } |
|
338 } |
|
339 return count; |
|
340 } |
|
341 |
|
342 /** |
|
343 * Returns <tt>true</tt> if this queue contains the specified element. |
|
344 * More formally, returns <tt>true</tt> if and only if this queue contains |
|
345 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. |
|
346 * |
|
347 * @param o object to be checked for containment in this queue |
|
348 * @return <tt>true</tt> if this queue contains the specified element |
|
349 */ |
|
350 public boolean contains(Object o) { |
|
351 if (o == null) return false; |
|
352 for (Node<E> p = first(); p != null; p = p.getNext()) { |
|
353 E item = p.getItem(); |
|
354 if (item != null && |
|
355 o.equals(item)) |
|
356 return true; |
|
357 } |
|
358 return false; |
|
359 } |
|
360 |
|
361 /** |
|
362 * Removes a single instance of the specified element from this queue, |
|
363 * if it is present. More formally, removes an element <tt>e</tt> such |
|
364 * that <tt>o.equals(e)</tt>, if this queue contains one or more such |
|
365 * elements. |
|
366 * Returns <tt>true</tt> if this queue contained the specified element |
|
367 * (or equivalently, if this queue changed as a result of the call). |
|
368 * |
|
369 * @param o element to be removed from this queue, if present |
|
370 * @return <tt>true</tt> if this queue changed as a result of the call |
|
371 */ |
|
372 public boolean remove(Object o) { |
|
373 if (o == null) return false; |
|
374 for (Node<E> p = first(); p != null; p = p.getNext()) { |
|
375 E item = p.getItem(); |
|
376 if (item != null && |
|
377 o.equals(item) && |
|
378 p.casItem(item, null)) |
|
379 return true; |
|
380 } |
|
381 return false; |
|
382 } |
|
383 |
|
384 /** |
|
385 * Returns an array containing all of the elements in this queue, in |
|
386 * proper sequence. |
|
387 * |
|
388 * <p>The returned array will be "safe" in that no references to it are |
|
389 * maintained by this queue. (In other words, this method must allocate |
|
390 * a new array). The caller is thus free to modify the returned array. |
|
391 * |
|
392 * <p>This method acts as bridge between array-based and collection-based |
|
393 * APIs. |
|
394 * |
|
395 * @return an array containing all of the elements in this queue |
|
396 */ |
|
397 public Object[] toArray() { |
|
398 // Use ArrayList to deal with resizing. |
|
399 ArrayList<E> al = new ArrayList<E>(); |
|
400 for (Node<E> p = first(); p != null; p = p.getNext()) { |
|
401 E item = p.getItem(); |
|
402 if (item != null) |
|
403 al.add(item); |
|
404 } |
|
405 return al.toArray(); |
|
406 } |
|
407 |
|
408 /** |
|
409 * Returns an array containing all of the elements in this queue, in |
|
410 * proper sequence; the runtime type of the returned array is that of |
|
411 * the specified array. If the queue fits in the specified array, it |
|
412 * is returned therein. Otherwise, a new array is allocated with the |
|
413 * runtime type of the specified array and the size of this queue. |
|
414 * |
|
415 * <p>If this queue fits in the specified array with room to spare |
|
416 * (i.e., the array has more elements than this queue), the element in |
|
417 * the array immediately following the end of the queue is set to |
|
418 * <tt>null</tt>. |
|
419 * |
|
420 * <p>Like the {@link #toArray()} method, this method acts as bridge between |
|
421 * array-based and collection-based APIs. Further, this method allows |
|
422 * precise control over the runtime type of the output array, and may, |
|
423 * under certain circumstances, be used to save allocation costs. |
|
424 * |
|
425 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. |
|
426 * The following code can be used to dump the queue into a newly |
|
427 * allocated array of <tt>String</tt>: |
|
428 * |
|
429 * <pre> |
|
430 * String[] y = x.toArray(new String[0]);</pre> |
|
431 * |
|
432 * Note that <tt>toArray(new Object[0])</tt> is identical in function to |
|
433 * <tt>toArray()</tt>. |
|
434 * |
|
435 * @param a the array into which the elements of the queue are to |
|
436 * be stored, if it is big enough; otherwise, a new array of the |
|
437 * same runtime type is allocated for this purpose |
|
438 * @return an array containing all of the elements in this queue |
|
439 * @throws ArrayStoreException if the runtime type of the specified array |
|
440 * is not a supertype of the runtime type of every element in |
|
441 * this queue |
|
442 * @throws NullPointerException if the specified array is null |
|
443 */ |
|
444 public <T> T[] toArray(T[] a) { |
|
445 // try to use sent-in array |
|
446 int k = 0; |
|
447 Node<E> p; |
|
448 for (p = first(); p != null && k < a.length; p = p.getNext()) { |
|
449 E item = p.getItem(); |
|
450 if (item != null) |
|
451 a[k++] = (T)item; |
|
452 } |
|
453 if (p == null) { |
|
454 if (k < a.length) |
|
455 a[k] = null; |
|
456 return a; |
|
457 } |
|
458 |
|
459 // If won't fit, use ArrayList version |
|
460 ArrayList<E> al = new ArrayList<E>(); |
|
461 for (Node<E> q = first(); q != null; q = q.getNext()) { |
|
462 E item = q.getItem(); |
|
463 if (item != null) |
|
464 al.add(item); |
|
465 } |
|
466 return al.toArray(a); |
|
467 } |
|
468 |
|
469 /** |
|
470 * Returns an iterator over the elements in this queue in proper sequence. |
|
471 * The returned iterator is a "weakly consistent" iterator that |
|
472 * will never throw {@link ConcurrentModificationException}, |
|
473 * and guarantees to traverse elements as they existed upon |
|
474 * construction of the iterator, and may (but is not guaranteed to) |
|
475 * reflect any modifications subsequent to construction. |
|
476 * |
|
477 * @return an iterator over the elements in this queue in proper sequence |
|
478 */ |
|
479 public Iterator<E> iterator() { |
|
480 return new Itr(); |
|
481 } |
|
482 |
|
483 private class Itr implements Iterator<E> { |
|
484 /** |
|
485 * Next node to return item for. |
|
486 */ |
|
487 private Node<E> nextNode; |
|
488 |
|
489 /** |
|
490 * nextItem holds on to item fields because once we claim |
|
491 * that an element exists in hasNext(), we must return it in |
|
492 * the following next() call even if it was in the process of |
|
493 * being removed when hasNext() was called. |
|
494 */ |
|
495 private E nextItem; |
|
496 |
|
497 /** |
|
498 * Node of the last returned item, to support remove. |
|
499 */ |
|
500 private Node<E> lastRet; |
|
501 |
|
502 Itr() { |
|
503 advance(); |
|
504 } |
|
505 |
|
506 /** |
|
507 * Moves to next valid node and returns item to return for |
|
508 * next(), or null if no such. |
|
509 */ |
|
510 private E advance() { |
|
511 lastRet = nextNode; |
|
512 E x = nextItem; |
|
513 |
|
514 Node<E> p = (nextNode == null)? first() : nextNode.getNext(); |
|
515 for (;;) { |
|
516 if (p == null) { |
|
517 nextNode = null; |
|
518 nextItem = null; |
|
519 return x; |
|
520 } |
|
521 E item = p.getItem(); |
|
522 if (item != null) { |
|
523 nextNode = p; |
|
524 nextItem = item; |
|
525 return x; |
|
526 } else // skip over nulls |
|
527 p = p.getNext(); |
|
528 } |
|
529 } |
|
530 |
|
531 public boolean hasNext() { |
|
532 return nextNode != null; |
|
533 } |
|
534 |
|
535 public E next() { |
|
536 if (nextNode == null) throw new NoSuchElementException(); |
|
537 return advance(); |
|
538 } |
|
539 |
|
540 public void remove() { |
|
541 Node<E> l = lastRet; |
|
542 if (l == null) throw new IllegalStateException(); |
|
543 // rely on a future traversal to relink. |
|
544 l.setItem(null); |
|
545 lastRet = null; |
|
546 } |
|
547 } |
|
548 |
|
549 /** |
|
550 * Save the state to a stream (that is, serialize it). |
|
551 * |
|
552 * @serialData All of the elements (each an <tt>E</tt>) in |
|
553 * the proper order, followed by a null |
|
554 * @param s the stream |
|
555 */ |
|
556 private void writeObject(java.io.ObjectOutputStream s) |
|
557 throws java.io.IOException { |
|
558 |
|
559 // Write out any hidden stuff |
|
560 s.defaultWriteObject(); |
|
561 |
|
562 // Write out all elements in the proper order. |
|
563 for (Node<E> p = first(); p != null; p = p.getNext()) { |
|
564 Object item = p.getItem(); |
|
565 if (item != null) |
|
566 s.writeObject(item); |
|
567 } |
|
568 |
|
569 // Use trailing null as sentinel |
|
570 s.writeObject(null); |
|
571 } |
|
572 |
|
573 /** |
|
574 * Reconstitute the Queue instance from a stream (that is, |
|
575 * deserialize it). |
|
576 * @param s the stream |
|
577 */ |
|
578 private void readObject(java.io.ObjectInputStream s) |
|
579 throws java.io.IOException, ClassNotFoundException { |
|
580 // Read in capacity, and any hidden stuff |
|
581 s.defaultReadObject(); |
|
582 head = new Node<E>(null, null); |
|
583 tail = head; |
|
584 // Read in all elements and place in queue |
|
585 for (;;) { |
|
586 E item = (E)s.readObject(); |
|
587 if (item == null) |
|
588 break; |
|
589 else |
|
590 offer(item); |
|
591 } |
|
592 } |
|
593 |
|
594 } |