jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
changeset 3414 cdf768813b4d
parent 2 90ce3da70b43
child 3419 3ae6dc68c20d
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java	Tue Jul 28 11:15:49 2009 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java	Tue Jul 28 13:24:52 2009 -0700
@@ -34,9 +34,13 @@
  */
 
 package java.util.concurrent;
-import java.util.*;
-import java.util.concurrent.atomic.*;
 
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 
 /**
  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
@@ -47,9 +51,9 @@
  * queue the shortest time. New elements
  * are inserted at the tail of the queue, and the queue retrieval
  * operations obtain elements at the head of the queue.
- * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
+ * A {@code ConcurrentLinkedQueue} is an appropriate choice when
  * many threads will share access to a common collection.
- * This queue does not permit <tt>null</tt> elements.
+ * This queue does not permit {@code null} elements.
  *
  * <p>This implementation employs an efficient &quot;wait-free&quot;
  * algorithm based on one described in <a
@@ -57,7 +61,7 @@
  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
  *
- * <p>Beware that, unlike in most collections, the <tt>size</tt> method
+ * <p>Beware that, unlike in most collections, the {@code size} method
  * is <em>NOT</em> a constant-time operation. Because of the
  * asynchronous nature of these queues, determining the current number
  * of elements requires a traversal of the elements.
@@ -87,51 +91,102 @@
     private static final long serialVersionUID = 196745693267521676L;
 
     /*
-     * This is a straight adaptation of Michael & Scott algorithm.
-     * For explanation, read the paper.  The only (minor) algorithmic
-     * difference is that this version supports lazy deletion of
-     * internal nodes (method remove(Object)) -- remove CAS'es item
-     * fields to null. The normal queue operations unlink but then
-     * pass over nodes with null item fields. Similarly, iteration
-     * methods ignore those with nulls.
+     * This is a modification of the Michael & Scott algorithm,
+     * adapted for a garbage-collected environment, with support for
+     * interior node deletion (to support remove(Object)).  For
+     * explanation, read the paper.
      *
-     * Also note that like most non-blocking algorithms in this
-     * package, this implementation relies on the fact that in garbage
+     * Note that like most non-blocking algorithms in this package,
+     * this implementation relies on the fact that in garbage
      * collected systems, there is no possibility of ABA problems due
      * to recycled nodes, so there is no need to use "counted
      * pointers" or related techniques seen in versions used in
      * non-GC'ed settings.
+     *
+     * The fundamental invariants are:
+     * - There is exactly one (last) Node with a null next reference,
+     *   which is CASed when enqueueing.  This last Node can be
+     *   reached in O(1) time from tail, but tail is merely an
+     *   optimization - it can always be reached in O(N) time from
+     *   head as well.
+     * - The elements contained in the queue are the non-null items in
+     *   Nodes that are reachable from head.  CASing the item
+     *   reference of a Node to null atomically removes it from the
+     *   queue.  Reachability of all elements from head must remain
+     *   true even in the case of concurrent modifications that cause
+     *   head to advance.  A dequeued Node may remain in use
+     *   indefinitely due to creation of an Iterator or simply a
+     *   poll() that has lost its time slice.
+     *
+     * The above might appear to imply that all Nodes are GC-reachable
+     * from a predecessor dequeued Node.  That would cause two problems:
+     * - allow a rogue Iterator to cause unbounded memory retention
+     * - cause cross-generational linking of old Nodes to new Nodes if
+     *   a Node was tenured while live, which generational GCs have a
+     *   hard time dealing with, causing repeated major collections.
+     * However, only non-deleted Nodes need to be reachable from
+     * dequeued Nodes, and reachability does not necessarily have to
+     * be of the kind understood by the GC.  We use the trick of
+     * linking a Node that has just been dequeued to itself.  Such a
+     * self-link implicitly means to advance to head.
+     *
+     * Both head and tail are permitted to lag.  In fact, failing to
+     * update them every time one could is a significant optimization
+     * (fewer CASes). This is controlled by local "hops" variables
+     * that only trigger helping-CASes after experiencing multiple
+     * lags.
+     *
+     * Since head and tail are updated concurrently and independently,
+     * it is possible for tail to lag behind head (why not)?
+     *
+     * CASing a Node's item reference to null atomically removes the
+     * element from the queue.  Iterators skip over Nodes with null
+     * items.  Prior implementations of this class had a race between
+     * poll() and remove(Object) where the same element would appear
+     * to be successfully removed by two concurrent operations.  The
+     * method remove(Object) also lazily unlinks deleted Nodes, but
+     * this is merely an optimization.
+     *
+     * When constructing a Node (before enqueuing it) we avoid paying
+     * for a volatile write to item by using lazySet instead of a
+     * normal write.  This allows the cost of enqueue to be
+     * "one-and-a-half" CASes.
+     *
+     * Both head and tail may or may not point to a Node with a
+     * non-null item.  If the queue is empty, all items must of course
+     * be null.  Upon creation, both head and tail refer to a dummy
+     * Node with null item.  Both head and tail are only updated using
+     * CAS, so they never regress, although again this is merely an
+     * optimization.
      */
 
     private static class Node<E> {
         private volatile E item;
         private volatile Node<E> next;
 
-        private static final
-            AtomicReferenceFieldUpdater<Node, Node>
-            nextUpdater =
-            AtomicReferenceFieldUpdater.newUpdater
-            (Node.class, Node.class, "next");
-        private static final
-            AtomicReferenceFieldUpdater<Node, Object>
-            itemUpdater =
-            AtomicReferenceFieldUpdater.newUpdater
-            (Node.class, Object.class, "item");
-
-        Node(E x) { item = x; }
-
-        Node(E x, Node<E> n) { item = x; next = n; }
+        Node(E item) {
+            // Piggyback on imminent casNext()
+            lazySetItem(item);
+        }
 
         E getItem() {
             return item;
         }
 
         boolean casItem(E cmp, E val) {
-            return itemUpdater.compareAndSet(this, cmp, val);
+            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
         }
 
         void setItem(E val) {
-            itemUpdater.set(this, val);
+            item = val;
+        }
+
+        void lazySetItem(E val) {
+            UNSAFE.putOrderedObject(this, itemOffset, val);
+        }
+
+        void lazySetNext(Node<E> val) {
+            UNSAFE.putOrderedObject(this, nextOffset, val);
         }
 
         Node<E> getNext() {
@@ -139,52 +194,55 @@
         }
 
         boolean casNext(Node<E> cmp, Node<E> val) {
-            return nextUpdater.compareAndSet(this, cmp, val);
+            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
         }
 
-        void setNext(Node<E> val) {
-            nextUpdater.set(this, val);
-        }
+        // Unsafe mechanics
 
+        private static final sun.misc.Unsafe UNSAFE =
+            sun.misc.Unsafe.getUnsafe();
+        private static final long nextOffset =
+            objectFieldOffset(UNSAFE, "next", Node.class);
+        private static final long itemOffset =
+            objectFieldOffset(UNSAFE, "item", Node.class);
     }
 
-    private static final
-        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
-        tailUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (ConcurrentLinkedQueue.class, Node.class, "tail");
-    private static final
-        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
-        headUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (ConcurrentLinkedQueue.class,  Node.class, "head");
-
-    private boolean casTail(Node<E> cmp, Node<E> val) {
-        return tailUpdater.compareAndSet(this, cmp, val);
-    }
-
-    private boolean casHead(Node<E> cmp, Node<E> val) {
-        return headUpdater.compareAndSet(this, cmp, val);
-    }
-
+    /**
+     * A node from which the first live (non-deleted) node (if any)
+     * can be reached in O(1) time.
+     * Invariants:
+     * - all live nodes are reachable from head via succ()
+     * - head != null
+     * - (tmp = head).next != tmp || tmp != head
+     * Non-invariants:
+     * - head.item may or may not be null.
+     * - it is permitted for tail to lag behind head, that is, for tail
+     *   to not be reachable from head!
+     */
+    private transient volatile Node<E> head = new Node<E>(null);
 
     /**
-     * Pointer to header node, initialized to a dummy node.  The first
-     * actual node is at head.getNext().
+     * A node from which the last node on list (that is, the unique
+     * node with node.next == null) can be reached in O(1) time.
+     * Invariants:
+     * - the last node is always reachable from tail via succ()
+     * - tail != null
+     * Non-invariants:
+     * - tail.item may or may not be null.
+     * - it is permitted for tail to lag behind head, that is, for tail
+     *   to not be reachable from head!
+     * - tail.next may or may not be self-pointing to tail.
      */
-    private transient volatile Node<E> head = new Node<E>(null, null);
-
-    /** Pointer to last node on list **/
     private transient volatile Node<E> tail = head;
 
 
     /**
-     * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
+     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
      */
     public ConcurrentLinkedQueue() {}
 
     /**
-     * Creates a <tt>ConcurrentLinkedQueue</tt>
+     * Creates a {@code ConcurrentLinkedQueue}
      * initially containing the elements of the given collection,
      * added in traversal order of the collection's iterator.
      * @param c the collection of elements to initially contain
@@ -201,7 +259,7 @@
     /**
      * Inserts the specified element at the tail of this queue.
      *
-     * @return <tt>true</tt> (as specified by {@link Collection#add})
+     * @return {@code true} (as specified by {@link Collection#add})
      * @throws NullPointerException if the specified element is null
      */
     public boolean add(E e) {
@@ -209,107 +267,135 @@
     }
 
     /**
+     * We don't bother to update head or tail pointers if fewer than
+     * HOPS links from "true" location.  We assume that volatile
+     * writes are significantly more expensive than volatile reads.
+     */
+    private static final int HOPS = 1;
+
+    /**
+     * Try to CAS head to p. If successful, repoint old head to itself
+     * as sentinel for succ(), below.
+     */
+    final void updateHead(Node<E> h, Node<E> p) {
+        if (h != p && casHead(h, p))
+            h.lazySetNext(h);
+    }
+
+    /**
+     * Returns the successor of p, or the head node if p.next has been
+     * linked to self, which will only be true if traversing with a
+     * stale pointer that is now off the list.
+     */
+    final Node<E> succ(Node<E> p) {
+        Node<E> next = p.getNext();
+        return (p == next) ? head : next;
+    }
+
+    /**
      * Inserts the specified element at the tail of this queue.
      *
-     * @return <tt>true</tt> (as specified by {@link Queue#offer})
+     * @return {@code true} (as specified by {@link Queue#offer})
      * @throws NullPointerException if the specified element is null
      */
     public boolean offer(E e) {
         if (e == null) throw new NullPointerException();
-        Node<E> n = new Node<E>(e, null);
+        Node<E> n = new Node<E>(e);
+        retry:
         for (;;) {
             Node<E> t = tail;
-            Node<E> s = t.getNext();
-            if (t == tail) {
-                if (s == null) {
-                    if (t.casNext(s, n)) {
-                        casTail(t, n);
-                        return true;
-                    }
+            Node<E> p = t;
+            for (int hops = 0; ; hops++) {
+                Node<E> next = succ(p);
+                if (next != null) {
+                    if (hops > HOPS && t != tail)
+                        continue retry;
+                    p = next;
+                } else if (p.casNext(null, n)) {
+                    if (hops >= HOPS)
+                        casTail(t, n);  // Failure is OK.
+                    return true;
                 } else {
-                    casTail(t, s);
+                    p = succ(p);
                 }
             }
         }
     }
 
     public E poll() {
-        for (;;) {
-            Node<E> h = head;
-            Node<E> t = tail;
-            Node<E> first = h.getNext();
-            if (h == head) {
-                if (h == t) {
-                    if (first == null)
-                        return null;
-                    else
-                        casTail(t, first);
-                } else if (casHead(h, first)) {
-                    E item = first.getItem();
-                    if (item != null) {
-                        first.setItem(null);
-                        return item;
-                    }
-                    // else skip over deleted item, continue loop,
+        Node<E> h = head;
+        Node<E> p = h;
+        for (int hops = 0; ; hops++) {
+            E item = p.getItem();
+
+            if (item != null && p.casItem(item, null)) {
+                if (hops >= HOPS) {
+                    Node<E> q = p.getNext();
+                    updateHead(h, (q != null) ? q : p);
                 }
+                return item;
             }
+            Node<E> next = succ(p);
+            if (next == null) {
+                updateHead(h, p);
+                break;
+            }
+            p = next;
         }
+        return null;
     }
 
-    public E peek() { // same as poll except don't remove item
+    public E peek() {
+        Node<E> h = head;
+        Node<E> p = h;
+        E item;
         for (;;) {
-            Node<E> h = head;
-            Node<E> t = tail;
-            Node<E> first = h.getNext();
-            if (h == head) {
-                if (h == t) {
-                    if (first == null)
-                        return null;
-                    else
-                        casTail(t, first);
-                } else {
-                    E item = first.getItem();
-                    if (item != null)
-                        return item;
-                    else // remove deleted node and continue
-                        casHead(h, first);
-                }
+            item = p.getItem();
+            if (item != null)
+                break;
+            Node<E> next = succ(p);
+            if (next == null) {
+                break;
             }
+            p = next;
         }
+        updateHead(h, p);
+        return item;
     }
 
     /**
-     * Returns the first actual (non-header) node on list.  This is yet
-     * another variant of poll/peek; here returning out the first
-     * node, not element (so we cannot collapse with peek() without
-     * introducing race.)
+     * Returns the first live (non-deleted) node on list, or null if none.
+     * This is yet another variant of poll/peek; here returning the
+     * first node, not element.  We could make peek() a wrapper around
+     * first(), but that would cost an extra volatile read of item,
+     * and the need to add a retry loop to deal with the possibility
+     * of losing a race to a concurrent poll().
      */
     Node<E> first() {
+        Node<E> h = head;
+        Node<E> p = h;
+        Node<E> result;
         for (;;) {
-            Node<E> h = head;
-            Node<E> t = tail;
-            Node<E> first = h.getNext();
-            if (h == head) {
-                if (h == t) {
-                    if (first == null)
-                        return null;
-                    else
-                        casTail(t, first);
-                } else {
-                    if (first.getItem() != null)
-                        return first;
-                    else // remove deleted node and continue
-                        casHead(h, first);
-                }
+            E item = p.getItem();
+            if (item != null) {
+                result = p;
+                break;
             }
+            Node<E> next = succ(p);
+            if (next == null) {
+                result = null;
+                break;
+            }
+            p = next;
         }
+        updateHead(h, p);
+        return result;
     }
 
-
     /**
-     * Returns <tt>true</tt> if this queue contains no elements.
+     * Returns {@code true} if this queue contains no elements.
      *
-     * @return <tt>true</tt> if this queue contains no elements
+     * @return {@code true} if this queue contains no elements
      */
     public boolean isEmpty() {
         return first() == null;
@@ -317,8 +403,8 @@
 
     /**
      * Returns the number of elements in this queue.  If this queue
-     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
-     * <tt>Integer.MAX_VALUE</tt>.
+     * contains more than {@code Integer.MAX_VALUE} elements, returns
+     * {@code Integer.MAX_VALUE}.
      *
      * <p>Beware that, unlike in most collections, this method is
      * <em>NOT</em> a constant-time operation. Because of the
@@ -329,7 +415,7 @@
      */
     public int size() {
         int count = 0;
-        for (Node<E> p = first(); p != null; p = p.getNext()) {
+        for (Node<E> p = first(); p != null; p = succ(p)) {
             if (p.getItem() != null) {
                 // Collections.size() spec says to max out
                 if (++count == Integer.MAX_VALUE)
@@ -340,16 +426,16 @@
     }
 
     /**
-     * Returns <tt>true</tt> if this queue contains the specified element.
-     * More formally, returns <tt>true</tt> if and only if this queue contains
-     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
+     * Returns {@code true} if this queue contains the specified element.
+     * More formally, returns {@code true} if and only if this queue contains
+     * at least one element {@code e} such that {@code o.equals(e)}.
      *
      * @param o object to be checked for containment in this queue
-     * @return <tt>true</tt> if this queue contains the specified element
+     * @return {@code true} if this queue contains the specified element
      */
     public boolean contains(Object o) {
         if (o == null) return false;
-        for (Node<E> p = first(); p != null; p = p.getNext()) {
+        for (Node<E> p = first(); p != null; p = succ(p)) {
             E item = p.getItem();
             if (item != null &&
                 o.equals(item))
@@ -360,23 +446,29 @@
 
     /**
      * Removes a single instance of the specified element from this queue,
-     * if it is present.  More formally, removes an element <tt>e</tt> such
-     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
+     * if it is present.  More formally, removes an element {@code e} such
+     * that {@code o.equals(e)}, if this queue contains one or more such
      * elements.
-     * Returns <tt>true</tt> if this queue contained the specified element
+     * Returns {@code true} if this queue contained the specified element
      * (or equivalently, if this queue changed as a result of the call).
      *
      * @param o element to be removed from this queue, if present
-     * @return <tt>true</tt> if this queue changed as a result of the call
+     * @return {@code true} if this queue changed as a result of the call
      */
     public boolean remove(Object o) {
         if (o == null) return false;
-        for (Node<E> p = first(); p != null; p = p.getNext()) {
+        Node<E> pred = null;
+        for (Node<E> p = first(); p != null; p = succ(p)) {
             E item = p.getItem();
             if (item != null &&
                 o.equals(item) &&
-                p.casItem(item, null))
+                p.casItem(item, null)) {
+                Node<E> next = succ(p);
+                if (pred != null && next != null)
+                    pred.casNext(p, next);
                 return true;
+            }
+            pred = p;
         }
         return false;
     }
@@ -397,7 +489,7 @@
     public Object[] toArray() {
         // Use ArrayList to deal with resizing.
         ArrayList<E> al = new ArrayList<E>();
-        for (Node<E> p = first(); p != null; p = p.getNext()) {
+        for (Node<E> p = first(); p != null; p = succ(p)) {
             E item = p.getItem();
             if (item != null)
                 al.add(item);
@@ -415,22 +507,22 @@
      * <p>If this queue fits in the specified array with room to spare
      * (i.e., the array has more elements than this queue), the element in
      * the array immediately following the end of the queue is set to
-     * <tt>null</tt>.
+     * {@code null}.
      *
      * <p>Like the {@link #toArray()} method, this method acts as bridge between
      * array-based and collection-based APIs.  Further, this method allows
      * precise control over the runtime type of the output array, and may,
      * under certain circumstances, be used to save allocation costs.
      *
-     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+     * <p>Suppose {@code x} is a queue known to contain only strings.
      * The following code can be used to dump the queue into a newly
-     * allocated array of <tt>String</tt>:
+     * allocated array of {@code String}:
      *
      * <pre>
      *     String[] y = x.toArray(new String[0]);</pre>
      *
-     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
-     * <tt>toArray()</tt>.
+     * Note that {@code toArray(new Object[0])} is identical in function to
+     * {@code toArray()}.
      *
      * @param a the array into which the elements of the queue are to
      *          be stored, if it is big enough; otherwise, a new array of the
@@ -441,11 +533,12 @@
      *         this queue
      * @throws NullPointerException if the specified array is null
      */
+    @SuppressWarnings("unchecked")
     public <T> T[] toArray(T[] a) {
         // try to use sent-in array
         int k = 0;
         Node<E> p;
-        for (p = first(); p != null && k < a.length; p = p.getNext()) {
+        for (p = first(); p != null && k < a.length; p = succ(p)) {
             E item = p.getItem();
             if (item != null)
                 a[k++] = (T)item;
@@ -458,7 +551,7 @@
 
         // If won't fit, use ArrayList version
         ArrayList<E> al = new ArrayList<E>();
-        for (Node<E> q = first(); q != null; q = q.getNext()) {
+        for (Node<E> q = first(); q != null; q = succ(q)) {
             E item = q.getItem();
             if (item != null)
                 al.add(item);
@@ -511,7 +604,15 @@
             lastRet = nextNode;
             E x = nextItem;
 
-            Node<E> p = (nextNode == null)? first() : nextNode.getNext();
+            Node<E> pred, p;
+            if (nextNode == null) {
+                p = first();
+                pred = null;
+            } else {
+                pred = nextNode;
+                p = succ(nextNode);
+            }
+
             for (;;) {
                 if (p == null) {
                     nextNode = null;
@@ -523,8 +624,13 @@
                     nextNode = p;
                     nextItem = item;
                     return x;
-                } else // skip over nulls
-                    p = p.getNext();
+                } else {
+                    // skip over nulls
+                    Node<E> next = succ(p);
+                    if (pred != null && next != null)
+                        pred.casNext(p, next);
+                    p = next;
+                }
             }
         }
 
@@ -549,7 +655,7 @@
     /**
      * Save the state to a stream (that is, serialize it).
      *
-     * @serialData All of the elements (each an <tt>E</tt>) in
+     * @serialData All of the elements (each an {@code E}) in
      * the proper order, followed by a null
      * @param s the stream
      */
@@ -560,7 +666,7 @@
         s.defaultWriteObject();
 
         // Write out all elements in the proper order.
-        for (Node<E> p = first(); p != null; p = p.getNext()) {
+        for (Node<E> p = first(); p != null; p = succ(p)) {
             Object item = p.getItem();
             if (item != null)
                 s.writeObject(item);
@@ -579,10 +685,11 @@
         throws java.io.IOException, ClassNotFoundException {
         // Read in capacity, and any hidden stuff
         s.defaultReadObject();
-        head = new Node<E>(null, null);
+        head = new Node<E>(null);
         tail = head;
         // Read in all elements and place in queue
         for (;;) {
+            @SuppressWarnings("unchecked")
             E item = (E)s.readObject();
             if (item == null)
                 break;
@@ -591,4 +698,35 @@
         }
     }
 
+    // Unsafe mechanics
+
+    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+    private static final long headOffset =
+        objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class);
+    private static final long tailOffset =
+        objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class);
+
+    private boolean casTail(Node<E> cmp, Node<E> val) {
+        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
+    }
+
+    private boolean casHead(Node<E> cmp, Node<E> val) {
+        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
+    }
+
+    private void lazySetHead(Node<E> val) {
+        UNSAFE.putOrderedObject(this, headOffset, val);
+    }
+
+    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+                                  String field, Class<?> klazz) {
+        try {
+            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+        } catch (NoSuchFieldException e) {
+            // Convert Exception to corresponding Error
+            NoSuchFieldError error = new NoSuchFieldError(field);
+            error.initCause(e);
+            throw error;
+        }
+    }
 }