jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
changeset 2 90ce3da70b43
child 3414 cdf768813b4d
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.  Sun designates this
       
     7  * particular file as subject to the "Classpath" exception as provided
       
     8  * by Sun in the LICENSE file that accompanied this code.
       
     9  *
       
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    13  * version 2 for more details (a copy is included in the LICENSE file that
       
    14  * accompanied this code).
       
    15  *
       
    16  * You should have received a copy of the GNU General Public License version
       
    17  * 2 along with this work; if not, write to the Free Software Foundation,
       
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    19  *
       
    20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    21  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    22  * have any questions.
       
    23  */
       
    24 
       
    25 /*
       
    26  * This file is available under and governed by the GNU General Public
       
    27  * License version 2 only, as published by the Free Software Foundation.
       
    28  * However, the following notice accompanied the original version of this
       
    29  * file:
       
    30  *
       
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    32  * Expert Group and released to the public domain, as explained at
       
    33  * http://creativecommons.org/licenses/publicdomain
       
    34  */
       
    35 
       
    36 package java.util.concurrent;
       
    37 import java.util.*;
       
    38 import java.util.concurrent.atomic.*;
       
    39 
       
    40 
       
    41 /**
       
    42  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
       
    43  * This queue orders elements FIFO (first-in-first-out).
       
    44  * The <em>head</em> of the queue is that element that has been on the
       
    45  * queue the longest time.
       
    46  * The <em>tail</em> of the queue is that element that has been on the
       
    47  * queue the shortest time. New elements
       
    48  * are inserted at the tail of the queue, and the queue retrieval
       
    49  * operations obtain elements at the head of the queue.
       
    50  * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
       
    51  * many threads will share access to a common collection.
       
    52  * This queue does not permit <tt>null</tt> elements.
       
    53  *
       
    54  * <p>This implementation employs an efficient &quot;wait-free&quot;
       
    55  * algorithm based on one described in <a
       
    56  * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
       
    57  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
       
    58  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
       
    59  *
       
    60  * <p>Beware that, unlike in most collections, the <tt>size</tt> method
       
    61  * is <em>NOT</em> a constant-time operation. Because of the
       
    62  * asynchronous nature of these queues, determining the current number
       
    63  * of elements requires a traversal of the elements.
       
    64  *
       
    65  * <p>This class and its iterator implement all of the
       
    66  * <em>optional</em> methods of the {@link Collection} and {@link
       
    67  * Iterator} interfaces.
       
    68  *
       
    69  * <p>Memory consistency effects: As with other concurrent
       
    70  * collections, actions in a thread prior to placing an object into a
       
    71  * {@code ConcurrentLinkedQueue}
       
    72  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
       
    73  * actions subsequent to the access or removal of that element from
       
    74  * the {@code ConcurrentLinkedQueue} in another thread.
       
    75  *
       
    76  * <p>This class is a member of the
       
    77  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
       
    78  * Java Collections Framework</a>.
       
    79  *
       
    80  * @since 1.5
       
    81  * @author Doug Lea
       
    82  * @param <E> the type of elements held in this collection
       
    83  *
       
    84  */
       
    85 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
       
    86         implements Queue<E>, java.io.Serializable {
       
    87     private static final long serialVersionUID = 196745693267521676L;
       
    88 
       
    89     /*
       
    90      * This is a straight adaptation of Michael & Scott algorithm.
       
    91      * For explanation, read the paper.  The only (minor) algorithmic
       
    92      * difference is that this version supports lazy deletion of
       
    93      * internal nodes (method remove(Object)) -- remove CAS'es item
       
    94      * fields to null. The normal queue operations unlink but then
       
    95      * pass over nodes with null item fields. Similarly, iteration
       
    96      * methods ignore those with nulls.
       
    97      *
       
    98      * Also note that like most non-blocking algorithms in this
       
    99      * package, this implementation relies on the fact that in garbage
       
   100      * collected systems, there is no possibility of ABA problems due
       
   101      * to recycled nodes, so there is no need to use "counted
       
   102      * pointers" or related techniques seen in versions used in
       
   103      * non-GC'ed settings.
       
   104      */
       
   105 
       
   106     private static class Node<E> {
       
   107         private volatile E item;
       
   108         private volatile Node<E> next;
       
   109 
       
   110         private static final
       
   111             AtomicReferenceFieldUpdater<Node, Node>
       
   112             nextUpdater =
       
   113             AtomicReferenceFieldUpdater.newUpdater
       
   114             (Node.class, Node.class, "next");
       
   115         private static final
       
   116             AtomicReferenceFieldUpdater<Node, Object>
       
   117             itemUpdater =
       
   118             AtomicReferenceFieldUpdater.newUpdater
       
   119             (Node.class, Object.class, "item");
       
   120 
       
   121         Node(E x) { item = x; }
       
   122 
       
   123         Node(E x, Node<E> n) { item = x; next = n; }
       
   124 
       
   125         E getItem() {
       
   126             return item;
       
   127         }
       
   128 
       
   129         boolean casItem(E cmp, E val) {
       
   130             return itemUpdater.compareAndSet(this, cmp, val);
       
   131         }
       
   132 
       
   133         void setItem(E val) {
       
   134             itemUpdater.set(this, val);
       
   135         }
       
   136 
       
   137         Node<E> getNext() {
       
   138             return next;
       
   139         }
       
   140 
       
   141         boolean casNext(Node<E> cmp, Node<E> val) {
       
   142             return nextUpdater.compareAndSet(this, cmp, val);
       
   143         }
       
   144 
       
   145         void setNext(Node<E> val) {
       
   146             nextUpdater.set(this, val);
       
   147         }
       
   148 
       
   149     }
       
   150 
       
   151     private static final
       
   152         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
       
   153         tailUpdater =
       
   154         AtomicReferenceFieldUpdater.newUpdater
       
   155         (ConcurrentLinkedQueue.class, Node.class, "tail");
       
   156     private static final
       
   157         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
       
   158         headUpdater =
       
   159         AtomicReferenceFieldUpdater.newUpdater
       
   160         (ConcurrentLinkedQueue.class,  Node.class, "head");
       
   161 
       
   162     private boolean casTail(Node<E> cmp, Node<E> val) {
       
   163         return tailUpdater.compareAndSet(this, cmp, val);
       
   164     }
       
   165 
       
   166     private boolean casHead(Node<E> cmp, Node<E> val) {
       
   167         return headUpdater.compareAndSet(this, cmp, val);
       
   168     }
       
   169 
       
   170 
       
   171     /**
       
   172      * Pointer to header node, initialized to a dummy node.  The first
       
   173      * actual node is at head.getNext().
       
   174      */
       
   175     private transient volatile Node<E> head = new Node<E>(null, null);
       
   176 
       
   177     /** Pointer to last node on list **/
       
   178     private transient volatile Node<E> tail = head;
       
   179 
       
   180 
       
   181     /**
       
   182      * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
       
   183      */
       
   184     public ConcurrentLinkedQueue() {}
       
   185 
       
   186     /**
       
   187      * Creates a <tt>ConcurrentLinkedQueue</tt>
       
   188      * initially containing the elements of the given collection,
       
   189      * added in traversal order of the collection's iterator.
       
   190      * @param c the collection of elements to initially contain
       
   191      * @throws NullPointerException if the specified collection or any
       
   192      *         of its elements are null
       
   193      */
       
   194     public ConcurrentLinkedQueue(Collection<? extends E> c) {
       
   195         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
       
   196             add(it.next());
       
   197     }
       
   198 
       
   199     // Have to override just to update the javadoc
       
   200 
       
   201     /**
       
   202      * Inserts the specified element at the tail of this queue.
       
   203      *
       
   204      * @return <tt>true</tt> (as specified by {@link Collection#add})
       
   205      * @throws NullPointerException if the specified element is null
       
   206      */
       
   207     public boolean add(E e) {
       
   208         return offer(e);
       
   209     }
       
   210 
       
   211     /**
       
   212      * Inserts the specified element at the tail of this queue.
       
   213      *
       
   214      * @return <tt>true</tt> (as specified by {@link Queue#offer})
       
   215      * @throws NullPointerException if the specified element is null
       
   216      */
       
   217     public boolean offer(E e) {
       
   218         if (e == null) throw new NullPointerException();
       
   219         Node<E> n = new Node<E>(e, null);
       
   220         for (;;) {
       
   221             Node<E> t = tail;
       
   222             Node<E> s = t.getNext();
       
   223             if (t == tail) {
       
   224                 if (s == null) {
       
   225                     if (t.casNext(s, n)) {
       
   226                         casTail(t, n);
       
   227                         return true;
       
   228                     }
       
   229                 } else {
       
   230                     casTail(t, s);
       
   231                 }
       
   232             }
       
   233         }
       
   234     }
       
   235 
       
   236     public E poll() {
       
   237         for (;;) {
       
   238             Node<E> h = head;
       
   239             Node<E> t = tail;
       
   240             Node<E> first = h.getNext();
       
   241             if (h == head) {
       
   242                 if (h == t) {
       
   243                     if (first == null)
       
   244                         return null;
       
   245                     else
       
   246                         casTail(t, first);
       
   247                 } else if (casHead(h, first)) {
       
   248                     E item = first.getItem();
       
   249                     if (item != null) {
       
   250                         first.setItem(null);
       
   251                         return item;
       
   252                     }
       
   253                     // else skip over deleted item, continue loop,
       
   254                 }
       
   255             }
       
   256         }
       
   257     }
       
   258 
       
   259     public E peek() { // same as poll except don't remove item
       
   260         for (;;) {
       
   261             Node<E> h = head;
       
   262             Node<E> t = tail;
       
   263             Node<E> first = h.getNext();
       
   264             if (h == head) {
       
   265                 if (h == t) {
       
   266                     if (first == null)
       
   267                         return null;
       
   268                     else
       
   269                         casTail(t, first);
       
   270                 } else {
       
   271                     E item = first.getItem();
       
   272                     if (item != null)
       
   273                         return item;
       
   274                     else // remove deleted node and continue
       
   275                         casHead(h, first);
       
   276                 }
       
   277             }
       
   278         }
       
   279     }
       
   280 
       
   281     /**
       
   282      * Returns the first actual (non-header) node on list.  This is yet
       
   283      * another variant of poll/peek; here returning out the first
       
   284      * node, not element (so we cannot collapse with peek() without
       
   285      * introducing race.)
       
   286      */
       
   287     Node<E> first() {
       
   288         for (;;) {
       
   289             Node<E> h = head;
       
   290             Node<E> t = tail;
       
   291             Node<E> first = h.getNext();
       
   292             if (h == head) {
       
   293                 if (h == t) {
       
   294                     if (first == null)
       
   295                         return null;
       
   296                     else
       
   297                         casTail(t, first);
       
   298                 } else {
       
   299                     if (first.getItem() != null)
       
   300                         return first;
       
   301                     else // remove deleted node and continue
       
   302                         casHead(h, first);
       
   303                 }
       
   304             }
       
   305         }
       
   306     }
       
   307 
       
   308 
       
   309     /**
       
   310      * Returns <tt>true</tt> if this queue contains no elements.
       
   311      *
       
   312      * @return <tt>true</tt> if this queue contains no elements
       
   313      */
       
   314     public boolean isEmpty() {
       
   315         return first() == null;
       
   316     }
       
   317 
       
   318     /**
       
   319      * Returns the number of elements in this queue.  If this queue
       
   320      * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
       
   321      * <tt>Integer.MAX_VALUE</tt>.
       
   322      *
       
   323      * <p>Beware that, unlike in most collections, this method is
       
   324      * <em>NOT</em> a constant-time operation. Because of the
       
   325      * asynchronous nature of these queues, determining the current
       
   326      * number of elements requires an O(n) traversal.
       
   327      *
       
   328      * @return the number of elements in this queue
       
   329      */
       
   330     public int size() {
       
   331         int count = 0;
       
   332         for (Node<E> p = first(); p != null; p = p.getNext()) {
       
   333             if (p.getItem() != null) {
       
   334                 // Collections.size() spec says to max out
       
   335                 if (++count == Integer.MAX_VALUE)
       
   336                     break;
       
   337             }
       
   338         }
       
   339         return count;
       
   340     }
       
   341 
       
   342     /**
       
   343      * Returns <tt>true</tt> if this queue contains the specified element.
       
   344      * More formally, returns <tt>true</tt> if and only if this queue contains
       
   345      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
       
   346      *
       
   347      * @param o object to be checked for containment in this queue
       
   348      * @return <tt>true</tt> if this queue contains the specified element
       
   349      */
       
   350     public boolean contains(Object o) {
       
   351         if (o == null) return false;
       
   352         for (Node<E> p = first(); p != null; p = p.getNext()) {
       
   353             E item = p.getItem();
       
   354             if (item != null &&
       
   355                 o.equals(item))
       
   356                 return true;
       
   357         }
       
   358         return false;
       
   359     }
       
   360 
       
   361     /**
       
   362      * Removes a single instance of the specified element from this queue,
       
   363      * if it is present.  More formally, removes an element <tt>e</tt> such
       
   364      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
       
   365      * elements.
       
   366      * Returns <tt>true</tt> if this queue contained the specified element
       
   367      * (or equivalently, if this queue changed as a result of the call).
       
   368      *
       
   369      * @param o element to be removed from this queue, if present
       
   370      * @return <tt>true</tt> if this queue changed as a result of the call
       
   371      */
       
   372     public boolean remove(Object o) {
       
   373         if (o == null) return false;
       
   374         for (Node<E> p = first(); p != null; p = p.getNext()) {
       
   375             E item = p.getItem();
       
   376             if (item != null &&
       
   377                 o.equals(item) &&
       
   378                 p.casItem(item, null))
       
   379                 return true;
       
   380         }
       
   381         return false;
       
   382     }
       
   383 
       
   384     /**
       
   385      * Returns an array containing all of the elements in this queue, in
       
   386      * proper sequence.
       
   387      *
       
   388      * <p>The returned array will be "safe" in that no references to it are
       
   389      * maintained by this queue.  (In other words, this method must allocate
       
   390      * a new array).  The caller is thus free to modify the returned array.
       
   391      *
       
   392      * <p>This method acts as bridge between array-based and collection-based
       
   393      * APIs.
       
   394      *
       
   395      * @return an array containing all of the elements in this queue
       
   396      */
       
   397     public Object[] toArray() {
       
   398         // Use ArrayList to deal with resizing.
       
   399         ArrayList<E> al = new ArrayList<E>();
       
   400         for (Node<E> p = first(); p != null; p = p.getNext()) {
       
   401             E item = p.getItem();
       
   402             if (item != null)
       
   403                 al.add(item);
       
   404         }
       
   405         return al.toArray();
       
   406     }
       
   407 
       
   408     /**
       
   409      * Returns an array containing all of the elements in this queue, in
       
   410      * proper sequence; the runtime type of the returned array is that of
       
   411      * the specified array.  If the queue fits in the specified array, it
       
   412      * is returned therein.  Otherwise, a new array is allocated with the
       
   413      * runtime type of the specified array and the size of this queue.
       
   414      *
       
   415      * <p>If this queue fits in the specified array with room to spare
       
   416      * (i.e., the array has more elements than this queue), the element in
       
   417      * the array immediately following the end of the queue is set to
       
   418      * <tt>null</tt>.
       
   419      *
       
   420      * <p>Like the {@link #toArray()} method, this method acts as bridge between
       
   421      * array-based and collection-based APIs.  Further, this method allows
       
   422      * precise control over the runtime type of the output array, and may,
       
   423      * under certain circumstances, be used to save allocation costs.
       
   424      *
       
   425      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
       
   426      * The following code can be used to dump the queue into a newly
       
   427      * allocated array of <tt>String</tt>:
       
   428      *
       
   429      * <pre>
       
   430      *     String[] y = x.toArray(new String[0]);</pre>
       
   431      *
       
   432      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
       
   433      * <tt>toArray()</tt>.
       
   434      *
       
   435      * @param a the array into which the elements of the queue are to
       
   436      *          be stored, if it is big enough; otherwise, a new array of the
       
   437      *          same runtime type is allocated for this purpose
       
   438      * @return an array containing all of the elements in this queue
       
   439      * @throws ArrayStoreException if the runtime type of the specified array
       
   440      *         is not a supertype of the runtime type of every element in
       
   441      *         this queue
       
   442      * @throws NullPointerException if the specified array is null
       
   443      */
       
   444     public <T> T[] toArray(T[] a) {
       
   445         // try to use sent-in array
       
   446         int k = 0;
       
   447         Node<E> p;
       
   448         for (p = first(); p != null && k < a.length; p = p.getNext()) {
       
   449             E item = p.getItem();
       
   450             if (item != null)
       
   451                 a[k++] = (T)item;
       
   452         }
       
   453         if (p == null) {
       
   454             if (k < a.length)
       
   455                 a[k] = null;
       
   456             return a;
       
   457         }
       
   458 
       
   459         // If won't fit, use ArrayList version
       
   460         ArrayList<E> al = new ArrayList<E>();
       
   461         for (Node<E> q = first(); q != null; q = q.getNext()) {
       
   462             E item = q.getItem();
       
   463             if (item != null)
       
   464                 al.add(item);
       
   465         }
       
   466         return al.toArray(a);
       
   467     }
       
   468 
       
   469     /**
       
   470      * Returns an iterator over the elements in this queue in proper sequence.
       
   471      * The returned iterator is a "weakly consistent" iterator that
       
   472      * will never throw {@link ConcurrentModificationException},
       
   473      * and guarantees to traverse elements as they existed upon
       
   474      * construction of the iterator, and may (but is not guaranteed to)
       
   475      * reflect any modifications subsequent to construction.
       
   476      *
       
   477      * @return an iterator over the elements in this queue in proper sequence
       
   478      */
       
   479     public Iterator<E> iterator() {
       
   480         return new Itr();
       
   481     }
       
   482 
       
   483     private class Itr implements Iterator<E> {
       
   484         /**
       
   485          * Next node to return item for.
       
   486          */
       
   487         private Node<E> nextNode;
       
   488 
       
   489         /**
       
   490          * nextItem holds on to item fields because once we claim
       
   491          * that an element exists in hasNext(), we must return it in
       
   492          * the following next() call even if it was in the process of
       
   493          * being removed when hasNext() was called.
       
   494          */
       
   495         private E nextItem;
       
   496 
       
   497         /**
       
   498          * Node of the last returned item, to support remove.
       
   499          */
       
   500         private Node<E> lastRet;
       
   501 
       
   502         Itr() {
       
   503             advance();
       
   504         }
       
   505 
       
   506         /**
       
   507          * Moves to next valid node and returns item to return for
       
   508          * next(), or null if no such.
       
   509          */
       
   510         private E advance() {
       
   511             lastRet = nextNode;
       
   512             E x = nextItem;
       
   513 
       
   514             Node<E> p = (nextNode == null)? first() : nextNode.getNext();
       
   515             for (;;) {
       
   516                 if (p == null) {
       
   517                     nextNode = null;
       
   518                     nextItem = null;
       
   519                     return x;
       
   520                 }
       
   521                 E item = p.getItem();
       
   522                 if (item != null) {
       
   523                     nextNode = p;
       
   524                     nextItem = item;
       
   525                     return x;
       
   526                 } else // skip over nulls
       
   527                     p = p.getNext();
       
   528             }
       
   529         }
       
   530 
       
   531         public boolean hasNext() {
       
   532             return nextNode != null;
       
   533         }
       
   534 
       
   535         public E next() {
       
   536             if (nextNode == null) throw new NoSuchElementException();
       
   537             return advance();
       
   538         }
       
   539 
       
   540         public void remove() {
       
   541             Node<E> l = lastRet;
       
   542             if (l == null) throw new IllegalStateException();
       
   543             // rely on a future traversal to relink.
       
   544             l.setItem(null);
       
   545             lastRet = null;
       
   546         }
       
   547     }
       
   548 
       
   549     /**
       
   550      * Save the state to a stream (that is, serialize it).
       
   551      *
       
   552      * @serialData All of the elements (each an <tt>E</tt>) in
       
   553      * the proper order, followed by a null
       
   554      * @param s the stream
       
   555      */
       
   556     private void writeObject(java.io.ObjectOutputStream s)
       
   557         throws java.io.IOException {
       
   558 
       
   559         // Write out any hidden stuff
       
   560         s.defaultWriteObject();
       
   561 
       
   562         // Write out all elements in the proper order.
       
   563         for (Node<E> p = first(); p != null; p = p.getNext()) {
       
   564             Object item = p.getItem();
       
   565             if (item != null)
       
   566                 s.writeObject(item);
       
   567         }
       
   568 
       
   569         // Use trailing null as sentinel
       
   570         s.writeObject(null);
       
   571     }
       
   572 
       
   573     /**
       
   574      * Reconstitute the Queue instance from a stream (that is,
       
   575      * deserialize it).
       
   576      * @param s the stream
       
   577      */
       
   578     private void readObject(java.io.ObjectInputStream s)
       
   579         throws java.io.IOException, ClassNotFoundException {
       
   580         // Read in capacity, and any hidden stuff
       
   581         s.defaultReadObject();
       
   582         head = new Node<E>(null, null);
       
   583         tail = head;
       
   584         // Read in all elements and place in queue
       
   585         for (;;) {
       
   586             E item = (E)s.readObject();
       
   587             if (item == null)
       
   588                 break;
       
   589             else
       
   590                 offer(item);
       
   591         }
       
   592     }
       
   593 
       
   594 }