jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
changeset 43521 60e247b8d9a4
parent 42322 c3474fef4fe4
child 44743 f0bbd698c486
--- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java	Fri Feb 03 13:24:59 2017 -0800
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java	Fri Feb 03 13:24:59 2017 -0800
@@ -42,11 +42,13 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * An unbounded {@link TransferQueue} based on linked nodes.
@@ -61,16 +63,15 @@
  * asynchronous nature of these queues, determining the current number
  * of elements requires a traversal of the elements, and so may report
  * inaccurate results if this collection is modified during traversal.
- * Additionally, the bulk operations {@code addAll},
- * {@code removeAll}, {@code retainAll}, {@code containsAll},
- * and {@code toArray} are <em>not</em> guaranteed
- * to be performed atomically. For example, an iterator operating
- * concurrently with an {@code addAll} operation might view only some
- * of the added elements.
  *
- * <p>This class and its iterator implement all of the
- * <em>optional</em> methods of the {@link Collection} and {@link
- * Iterator} interfaces.
+ * <p>Bulk operations that add, remove, or examine multiple elements,
+ * such as {@link #addAll}, {@link #removeIf} or {@link #forEach},
+ * are <em>not</em> guaranteed to be performed atomically.
+ * For example, a {@code forEach} traversal concurrent with an {@code
+ * addAll} operation might observe only some of the added elements.
+ *
+ * <p>This class and its iterator implement all of the <em>optional</em>
+ * methods of the {@link Collection} and {@link Iterator} interfaces.
  *
  * <p>Memory consistency effects: As with other concurrent
  * collections, actions in a thread prior to placing an object into a
@@ -158,9 +159,8 @@
      * correctly perform enqueue and dequeue operations by traversing
      * from a pointer to the initial node; CASing the item of the
      * first unmatched node on match and CASing the next field of the
-     * trailing node on appends. (Plus some special-casing when
-     * initially empty).  While this would be a terrible idea in
-     * itself, it does have the benefit of not requiring ANY atomic
+     * trailing node on appends.  While this would be a terrible idea
+     * in itself, it does have the benefit of not requiring ANY atomic
      * updates on head/tail fields.
      *
      * We introduce here an approach that lies between the extremes of
@@ -196,15 +196,15 @@
      * with a given probability per traversal step.
      *
      * In any strategy along these lines, because CASes updating
-     * fields may fail, the actual slack may exceed targeted
-     * slack. However, they may be retried at any time to maintain
-     * targets.  Even when using very small slack values, this
-     * approach works well for dual queues because it allows all
-     * operations up to the point of matching or appending an item
-     * (hence potentially allowing progress by another thread) to be
-     * read-only, thus not introducing any further contention. As
-     * described below, we implement this by performing slack
-     * maintenance retries only after these points.
+     * fields may fail, the actual slack may exceed targeted slack.
+     * However, they may be retried at any time to maintain targets.
+     * Even when using very small slack values, this approach works
+     * well for dual queues because it allows all operations up to the
+     * point of matching or appending an item (hence potentially
+     * allowing progress by another thread) to be read-only, thus not
+     * introducing any further contention.  As described below, we
+     * implement this by performing slack maintenance retries only
+     * after these points.
      *
      * As an accompaniment to such techniques, traversal overhead can
      * be further reduced without increasing contention of head
@@ -223,7 +223,7 @@
      * (Similar issues arise in non-GC environments.)  To cope with
      * this in our implementation, upon CASing to advance the head
      * pointer, we set the "next" link of the previous head to point
-     * only to itself; thus limiting the length of connected dead lists.
+     * only to itself; thus limiting the length of chains of dead nodes.
      * (We also take similar care to wipe out possibly garbage
      * retaining values held in other Node fields.)  However, doing so
      * adds some further complexity to traversal: If any "next"
@@ -266,15 +266,6 @@
      * interior nodes) except in the case of cancellation/removal (see
      * below).
      *
-     * We allow both the head and tail fields to be null before any
-     * nodes are enqueued; initializing upon first append.  This
-     * simplifies some other logic, as well as providing more
-     * efficient explicit control paths instead of letting JVMs insert
-     * implicit NullPointerExceptions when they are null.  While not
-     * currently fully implemented, we also leave open the possibility
-     * of re-nulling these fields when empty (which is complicated to
-     * arrange, for little benefit.)
-     *
      * All enqueue/dequeue operations are handled by the single method
      * "xfer" with parameters indicating whether to act as some form
      * of offer, put, poll, take, or transfer (each possibly with
@@ -282,44 +273,40 @@
      * method outweighs the code bulk and maintenance problems of
      * using separate methods for each case.
      *
-     * Operation consists of up to three phases. The first is
-     * implemented within method xfer, the second in tryAppend, and
-     * the third in method awaitMatch.
+     * Operation consists of up to two phases. The first is implemented
+     * in method xfer, the second in method awaitMatch.
      *
-     * 1. Try to match an existing node
+     * 1. Traverse until matching or appending (method xfer)
      *
-     *    Starting at head, skip already-matched nodes until finding
-     *    an unmatched node of opposite mode, if one exists, in which
-     *    case matching it and returning, also if necessary updating
-     *    head to one past the matched node (or the node itself if the
-     *    list has no other unmatched nodes). If the CAS misses, then
-     *    a loop retries advancing head by two steps until either
-     *    success or the slack is at most two. By requiring that each
-     *    attempt advances head by two (if applicable), we ensure that
-     *    the slack does not grow without bound. Traversals also check
-     *    if the initial head is now off-list, in which case they
-     *    start at the new head.
+     *    Conceptually, we simply traverse all nodes starting from head.
+     *    If we encounter an unmatched node of opposite mode, we match
+     *    it and return, also updating head (by at least 2 hops) to
+     *    one past the matched node (or the node itself if it's the
+     *    pinned trailing node).  Traversals also check for the
+     *    possibility of falling off-list, in which case they restart.
      *
-     *    If no candidates are found and the call was untimed
-     *    poll/offer, (argument "how" is NOW) return.
-     *
-     * 2. Try to append a new node (method tryAppend)
+     *    If the trailing node of the list is reached, a match is not
+     *    possible.  If this call was untimed poll or tryTransfer
+     *    (argument "how" is NOW), return empty-handed immediately.
+     *    Else a new node is CAS-appended.  On successful append, if
+     *    this call was ASYNC (e.g. offer), an element was
+     *    successfully added to the end of the queue and we return.
      *
-     *    Starting at current tail pointer, find the actual last node
-     *    and try to append a new node (or if head was null, establish
-     *    the first node). Nodes can be appended only if their
-     *    predecessors are either already matched or are of the same
-     *    mode. If we detect otherwise, then a new node with opposite
-     *    mode must have been appended during traversal, so we must
-     *    restart at phase 1. The traversal and update steps are
-     *    otherwise similar to phase 1: Retrying upon CAS misses and
-     *    checking for staleness.  In particular, if a self-link is
-     *    encountered, then we can safely jump to a node on the list
-     *    by continuing the traversal at current head.
+     *    Of course, this naive traversal is O(n) when no match is
+     *    possible.  We optimize the traversal by maintaining a tail
+     *    pointer, which is expected to be "near" the end of the list.
+     *    It is only safe to fast-forward to tail (in the presence of
+     *    arbitrary concurrent changes) if it is pointing to a node of
+     *    the same mode, even if it is dead (in this case no preceding
+     *    node could still be matchable by this traversal).  If we
+     *    need to restart due to falling off-list, we can again
+     *    fast-forward to tail, but only if it has changed since the
+     *    last traversal (else we might loop forever).  If tail cannot
+     *    be used, traversal starts at head (but in this case we
+     *    expect to be able to match near head).  As with head, we
+     *    CAS-advance the tail pointer by at least two hops.
      *
-     *    On successful append, if the call was ASYNC, return.
-     *
-     * 3. Await match or cancellation (method awaitMatch)
+     * 2. Await match or cancellation (method awaitMatch)
      *
      *    Wait for another thread to match node; instead cancelling if
      *    the current thread was interrupted or the wait timed out. On
@@ -373,12 +360,12 @@
      * from, the head of list.
      *
      * Without taking these into account, it would be possible for an
-     * unbounded number of supposedly removed nodes to remain
-     * reachable.  Situations leading to such buildup are uncommon but
-     * can occur in practice; for example when a series of short timed
-     * calls to poll repeatedly time out but never otherwise fall off
-     * the list because of an untimed call to take at the front of the
-     * queue.
+     * unbounded number of supposedly removed nodes to remain reachable.
+     * Situations leading to such buildup are uncommon but can occur
+     * in practice; for example when a series of short timed calls to
+     * poll repeatedly time out at the trailing node but otherwise
+     * never fall off the list because of an untimed call to take() at
+     * the front of the queue.
      *
      * When these cases arise, rather than always retraversing the
      * entire list to find an actual predecessor to unlink (which
@@ -391,10 +378,9 @@
      * We perform sweeps by the thread hitting threshold (rather than
      * background threads or by spreading work to other threads)
      * because in the main contexts in which removal occurs, the
-     * caller is already timed-out, cancelled, or performing a
-     * potentially O(n) operation (e.g. remove(x)), none of which are
-     * time-critical enough to warrant the overhead that alternatives
-     * would impose on other threads.
+     * caller is timed-out or cancelled, which are not time-critical
+     * enough to warrant the overhead that alternatives would impose
+     * on other threads.
      *
      * Because the sweepVotes estimate is conservative, and because
      * nodes become unlinked "naturally" as they fall off the head of
@@ -406,6 +392,13 @@
      * quiescent queues. The value defined below was chosen
      * empirically to balance these under various timeout scenarios.
      *
+     * Because traversal operations on the linked list of nodes are a
+     * natural opportunity to sweep dead nodes, we generally do so,
+     * including all the operations that might remove elements as they
+     * traverse, such as removeIf and Iterator.remove.  This largely
+     * eliminates long chains of dead interior nodes, except from
+     * cancelled or timed out blocking operations.
+     *
      * Note that we cannot self-link unlinked interior nodes during
      * sweeps. However, the associated garbage chains terminate when
      * some successor ultimately falls off the head of the list and is
@@ -446,54 +439,71 @@
 
     /**
      * Queue nodes. Uses Object, not E, for items to allow forgetting
-     * them after use.  Relies heavily on VarHandles to minimize
-     * unnecessary ordering constraints: Writes that are intrinsically
-     * ordered wrt other accesses or CASes use simple relaxed forms.
+     * them after use.  Writes that are intrinsically ordered wrt
+     * other accesses or CASes use simple relaxed forms.
      */
     static final class Node {
         final boolean isData;   // false if this is a request node
         volatile Object item;   // initially non-null if isData; CASed to match
         volatile Node next;
-        volatile Thread waiter; // null until waiting
+        volatile Thread waiter; // null when not waiting for a match
 
-        // CAS methods for fields
+        /**
+         * Constructs a data node holding item if item is non-null,
+         * else a request node.  Uses relaxed write because item can
+         * only be seen after piggy-backing publication via CAS.
+         */
+        Node(Object item) {
+            ITEM.set(this, item);
+            isData = (item != null);
+        }
+
+        /** Constructs a (matched data) dummy node. */
+        Node() {
+            isData = true;
+        }
+
         final boolean casNext(Node cmp, Node val) {
+            // assert val != null;
             return NEXT.compareAndSet(this, cmp, val);
         }
 
         final boolean casItem(Object cmp, Object val) {
-            // assert cmp == null || cmp.getClass() != Node.class;
+            // assert isData == (cmp != null);
+            // assert isData == (val == null);
+            // assert !(cmp instanceof Node);
             return ITEM.compareAndSet(this, cmp, val);
         }
 
         /**
-         * Constructs a new node.  Uses relaxed write because item can
-         * only be seen after publication via casNext.
-         */
-        Node(Object item, boolean isData) {
-            ITEM.set(this, item); // relaxed write
-            this.isData = isData;
-        }
-
-        /**
          * Links node to itself to avoid garbage retention.  Called
          * only after CASing head field, so uses relaxed write.
          */
-        final void forgetNext() {
-            NEXT.set(this, this);
+        final void selfLink() {
+            // assert isMatched();
+            NEXT.setRelease(this, this);
+        }
+
+        final void appendRelaxed(Node next) {
+            // assert next != null;
+            // assert this.next == null;
+            NEXT.set(this, next);
         }
 
         /**
-         * Sets item to self and waiter to null, to avoid garbage
-         * retention after matching or cancelling. Uses relaxed writes
-         * because order is already constrained in the only calling
-         * contexts: item is forgotten only after volatile/atomic
-         * mechanics that extract items.  Similarly, clearing waiter
-         * follows either CAS or return from park (if ever parked;
-         * else we don't care).
+         * Sets item (of a request node) to self and waiter to null,
+         * to avoid garbage retention after matching or cancelling.
+         * Uses relaxed writes because order is already constrained in
+         * the only calling contexts: item is forgotten only after
+         * volatile/atomic mechanics that extract items, and visitors
+         * of request nodes only ever check whether item is null.
+         * Similarly, clearing waiter follows either CAS or return
+         * from park (if ever parked; else we don't care).
          */
         final void forgetContents() {
-            ITEM.set(this, this);
+            // assert isMatched();
+            if (!isData)
+                ITEM.set(this, this);
             WAITER.set(this, null);
         }
 
@@ -502,15 +512,16 @@
          * case of artificial matches due to cancellation.
          */
         final boolean isMatched() {
-            Object x = item;
-            return (x == this) || ((x == null) == isData);
+            return isData == (item == null);
         }
 
-        /**
-         * Returns true if this is an unmatched request node.
-         */
-        final boolean isUnmatchedRequest() {
-            return !isData && item == null;
+        /** Tries to CAS-match this node; if successful, wakes waiter. */
+        final boolean tryMatch(Object cmp, Object val) {
+            if (casItem(cmp, val)) {
+                LockSupport.unpark(waiter);
+                return true;
+            }
+            return false;
         }
 
         /**
@@ -520,52 +531,46 @@
          */
         final boolean cannotPrecede(boolean haveData) {
             boolean d = isData;
-            Object x;
-            return d != haveData && (x = item) != this && (x != null) == d;
-        }
-
-        /**
-         * Tries to artificially match a data node -- used by remove.
-         */
-        final boolean tryMatchData() {
-            // assert isData;
-            Object x = item;
-            if (x != null && x != this && casItem(x, null)) {
-                LockSupport.unpark(waiter);
-                return true;
-            }
-            return false;
+            return d != haveData && d != (item == null);
         }
 
         private static final long serialVersionUID = -3375979862319811754L;
-
-        // VarHandle mechanics
-        private static final VarHandle ITEM;
-        private static final VarHandle NEXT;
-        private static final VarHandle WAITER;
-        static {
-            try {
-                MethodHandles.Lookup l = MethodHandles.lookup();
-                ITEM = l.findVarHandle(Node.class, "item", Object.class);
-                NEXT = l.findVarHandle(Node.class, "next", Node.class);
-                WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
-            } catch (ReflectiveOperationException e) {
-                throw new Error(e);
-            }
-        }
     }
 
-    /** head of the queue; null until first enqueue */
+    /**
+     * A node from which the first live (non-matched) node (if any)
+     * can be reached in O(1) time.
+     * Invariants:
+     * - all live nodes are reachable from head via .next
+     * - head != null
+     * - (tmp = head).next != tmp || tmp != head
+     * Non-invariants:
+     * - head may or may not be live
+     * - it is permitted for tail to lag behind head, that is, for tail
+     *   to not be reachable from head!
+     */
     transient volatile Node head;
 
-    /** tail of the queue; null until first append */
+    /**
+     * A node from which the last node on list (that is, the unique
+     * node with node.next == null) can be reached in O(1) time.
+     * Invariants:
+     * - the last node is always reachable from tail via .next
+     * - tail != null
+     * Non-invariants:
+     * - tail may or may not be live
+     * - it is permitted for tail to lag behind head, that is, for tail
+     *   to not be reachable from head!
+     * - tail.next may or may not be self-linked.
+     */
     private transient volatile Node tail;
 
-    /** The number of apparent failures to unsplice removed nodes */
+    /** The number of apparent failures to unsplice cancelled nodes */
     private transient volatile int sweepVotes;
 
-    // CAS methods for fields
     private boolean casTail(Node cmp, Node val) {
+        // assert cmp != null;
+        // assert val != null;
         return TAIL.compareAndSet(this, cmp, val);
     }
 
@@ -573,13 +578,71 @@
         return HEAD.compareAndSet(this, cmp, val);
     }
 
-    private boolean casSweepVotes(int cmp, int val) {
-        return SWEEPVOTES.compareAndSet(this, cmp, val);
+    /** Atomic version of ++sweepVotes. */
+    private int incSweepVotes() {
+        return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
+    }
+
+    /**
+     * Tries to CAS pred.next (or head, if pred is null) from c to p.
+     * Caller must ensure that we're not unlinking the trailing node.
+     */
+    private boolean tryCasSuccessor(Node pred, Node c, Node p) {
+        // assert p != null;
+        // assert c.isData != (c.item != null);
+        // assert c != p;
+        if (pred != null)
+            return pred.casNext(c, p);
+        if (casHead(c, p)) {
+            c.selfLink();
+            return true;
+        }
+        return false;
     }
 
-    /*
-     * Possible values for "how" argument in xfer method.
+    /**
+     * Collapses dead (matched) nodes between pred and q.
+     * @param pred the last known live node, or null if none
+     * @param c the first dead node
+     * @param p the last dead node
+     * @param q p.next: the next live node, or null if at end
+     * @return pred if pred still alive and CAS succeeded; else p
      */
+    private Node skipDeadNodes(Node pred, Node c, Node p, Node q) {
+        // assert pred != c;
+        // assert p != q;
+        // assert c.isMatched();
+        // assert p.isMatched();
+        if (q == null) {
+            // Never unlink trailing node.
+            if (c == p) return pred;
+            q = p;
+        }
+        return (tryCasSuccessor(pred, c, q)
+                && (pred == null || !pred.isMatched()))
+            ? pred : p;
+    }
+
+    /**
+     * Collapses dead (matched) nodes from h (which was once head) to p.
+     * Caller ensures all nodes from h up to and including p are dead.
+     */
+    private void skipDeadNodesNearHead(Node h, Node p) {
+        // assert h != null;
+        // assert h != p;
+        // assert p.isMatched();
+        for (;;) {
+            final Node q;
+            if ((q = p.next) == null) break;
+            else if (!q.isMatched()) { p = q; break; }
+            else if (p == (p = q)) return;
+        }
+        if (casHead(h, p))
+            h.selfLink();
+    }
+
+    /* Possible values for "how" argument in xfer method. */
+
     private static final int NOW   = 0; // for untimed poll, tryTransfer
     private static final int ASYNC = 1; // for offer, put, add
     private static final int SYNC  = 2; // for transfer, take
@@ -595,84 +658,32 @@
      * @return an item if matched, else e
      * @throws NullPointerException if haveData mode but e is null
      */
+    @SuppressWarnings("unchecked")
     private E xfer(E e, boolean haveData, int how, long nanos) {
         if (haveData && (e == null))
             throw new NullPointerException();
-        Node s = null;                        // the node to append, if needed
 
-        retry:
-        for (;;) {                            // restart on append race
-
-            for (Node h = head, p = h; p != null;) { // find & match first node
-                boolean isData = p.isData;
-                Object item = p.item;
-                if (item != p && (item != null) == isData) { // unmatched
-                    if (isData == haveData)   // can't match
-                        break;
-                    if (p.casItem(item, e)) { // match
-                        for (Node q = p; q != h;) {
-                            Node n = q.next;  // update by 2 unless singleton
-                            if (head == h && casHead(h, n == null ? q : n)) {
-                                h.forgetNext();
-                                break;
-                            }                 // advance and retry
-                            if ((h = head)   == null ||
-                                (q = h.next) == null || !q.isMatched())
-                                break;        // unless slack < 2
-                        }
-                        LockSupport.unpark(p.waiter);
-                        @SuppressWarnings("unchecked") E itemE = (E) item;
-                        return itemE;
+        restart: for (Node s = null, t = null, h = null;;) {
+            for (Node p = (t != (t = tail) && t.isData == haveData) ? t
+                     : (h = head);; ) {
+                final Node q; final Object item;
+                if (p.isData != haveData
+                    && haveData == ((item = p.item) == null)) {
+                    if (h == null) h = head;
+                    if (p.tryMatch(item, e)) {
+                        if (h != p) skipDeadNodesNearHead(h, p);
+                        return (E) item;
                     }
                 }
-                Node n = p.next;
-                p = (p != n) ? n : (h = head); // Use head if p offlist
-            }
-
-            if (how != NOW) {                 // No matches available
-                if (s == null)
-                    s = new Node(e, haveData);
-                Node pred = tryAppend(s, haveData);
-                if (pred == null)
-                    continue retry;           // lost race vs opposite mode
-                if (how != ASYNC)
-                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
-            }
-            return e; // not waiting
-        }
-    }
-
-    /**
-     * Tries to append node s as tail.
-     *
-     * @param s the node to append
-     * @param haveData true if appending in data mode
-     * @return null on failure due to losing race with append in
-     * different mode, else s's predecessor, or s itself if no
-     * predecessor
-     */
-    private Node tryAppend(Node s, boolean haveData) {
-        for (Node t = tail, p = t;;) {        // move p to last node and append
-            Node n, u;                        // temps for reads of next & tail
-            if (p == null && (p = head) == null) {
-                if (casHead(null, s))
-                    return s;                 // initialize
-            }
-            else if (p.cannotPrecede(haveData))
-                return null;                  // lost race vs opposite mode
-            else if ((n = p.next) != null)    // not last; keep traversing
-                p = p != t && t != (u = tail) ? (t = u) : // stale tail
-                    (p != n) ? n : null;      // restart if off list
-            else if (!p.casNext(null, s))
-                p = p.next;                   // re-read on CAS failure
-            else {
-                if (p != t) {                 // update if slack now >= 2
-                    while ((tail != t || !casTail(t, s)) &&
-                           (t = tail)   != null &&
-                           (s = t.next) != null && // advance and retry
-                           (s = s.next) != null && s != t);
+                if ((q = p.next) == null) {
+                    if (how == NOW) return e;
+                    if (s == null) s = new Node(e);
+                    if (!p.casNext(null, s)) continue;
+                    if (p != t) casTail(t, s);
+                    if (how == ASYNC) return e;
+                    return awaitMatch(s, p, e, (how == TIMED), nanos);
                 }
-                return p;
+                if (p == (p = q)) continue restart;
             }
         }
     }
@@ -681,9 +692,9 @@
      * Spins/yields/blocks until node s is matched or caller gives up.
      *
      * @param s the waiting node
-     * @param pred the predecessor of s, or s itself if it has no
-     * predecessor, or null if unknown (the null case does not occur
-     * in any current calls but may in possible future extensions)
+     * @param pred the predecessor of s, or null if unknown (the null
+     * case does not occur in any current calls but may in possible
+     * future extensions)
      * @param e the comparison value for checking match
      * @param timed if true, wait only until timeout elapses
      * @param nanos timeout in nanosecs, used only if timed is true
@@ -696,17 +707,20 @@
         ThreadLocalRandom randomYields = null; // bound if needed
 
         for (;;) {
-            Object item = s.item;
-            if (item != e) {                  // matched
+            final Object item;
+            if ((item = s.item) != e) {       // matched
                 // assert item != s;
                 s.forgetContents();           // avoid garbage
                 @SuppressWarnings("unchecked") E itemE = (E) item;
                 return itemE;
             }
             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
-                unsplice(pred, s);           // try to unlink and cancel
-                if (s.casItem(e, s))         // return normally if lost CAS
+                // try to cancel and unlink
+                if (s.casItem(e, s.isData ? null : s)) {
+                    unsplice(pred, s);
                     return e;
+                }
+                // return normally if lost CAS
             }
             else if (spins < 0) {            // establish spins at/near front
                 if ((spins = spinsFor(pred, s.isData)) > 0)
@@ -750,34 +764,33 @@
     /* -------------- Traversal methods -------------- */
 
     /**
-     * Returns the successor of p, or the head node if p.next has been
-     * linked to self, which will only be true if traversing with a
-     * stale pointer that is now off the list.
-     */
-    final Node succ(Node p) {
-        Node next = p.next;
-        return (p == next) ? head : next;
-    }
-
-    /**
      * Returns the first unmatched data node, or null if none.
-     * Callers must recheck if the returned node's item field is null
-     * or self-linked before using.
+     * Callers must recheck if the returned node is unmatched
+     * before using.
      */
     final Node firstDataNode() {
+        Node first = null;
         restartFromHead: for (;;) {
-            for (Node p = head; p != null;) {
-                Object item = p.item;
-                if (p.isData) {
-                    if (item != null && item != p)
-                        return p;
+            Node h = head, p = h;
+            for (; p != null;) {
+                final Object item;
+                if ((item = p.item) != null) {
+                    if (p.isData) {
+                        first = p;
+                        break;
+                    }
                 }
-                else if (item == null)
+                else if (!p.isData)
                     break;
-                if (p == (p = p.next))
+                final Node q;
+                if ((q = p.next) == null)
+                    break;
+                if (p == (p = q))
                     continue restartFromHead;
             }
-            return null;
+            if (p != h && casHead(h, p))
+                h.selfLink();
+            return first;
         }
     }
 
@@ -810,7 +823,7 @@
             for (Node p = head; p != null;) {
                 Object item = p.item;
                 if (p.isData) {
-                    if (item != null && item != p) {
+                    if (item != null) {
                         if (a == null)
                             a = new String[4];
                         else if (size == a.length)
@@ -839,7 +852,7 @@
             for (Node p = head; p != null;) {
                 Object item = p.item;
                 if (p.isData) {
-                    if (item != null && item != p) {
+                    if (item != null) {
                         if (x == null)
                             x = new Object[4];
                         else if (size == x.length)
@@ -918,76 +931,50 @@
      */
     @SuppressWarnings("unchecked")
     public <T> T[] toArray(T[] a) {
-        if (a == null) throw new NullPointerException();
+        Objects.requireNonNull(a);
         return (T[]) toArrayInternal(a);
     }
 
+    /**
+     * Weakly-consistent iterator.
+     *
+     * Lazily updated ancestor is expected to be amortized O(1) remove(),
+     * but O(n) in the worst case, when lastRet is concurrently deleted.
+     */
     final class Itr implements Iterator<E> {
         private Node nextNode;   // next node to return item for
         private E nextItem;      // the corresponding item
         private Node lastRet;    // last returned node, to support remove
-        private Node lastPred;   // predecessor to unlink lastRet
+        private Node ancestor;   // Helps unlink lastRet on remove()
 
         /**
-         * Moves to next node after prev, or first node if prev null.
+         * Moves to next node after pred, or first node if pred null.
          */
-        private void advance(Node prev) {
-            /*
-             * To track and avoid buildup of deleted nodes in the face
-             * of calls to both Queue.remove and Itr.remove, we must
-             * include variants of unsplice and sweep upon each
-             * advance: Upon Itr.remove, we may need to catch up links
-             * from lastPred, and upon other removes, we might need to
-             * skip ahead from stale nodes and unsplice deleted ones
-             * found while advancing.
-             */
-
-            Node r, b; // reset lastPred upon possible deletion of lastRet
-            if ((r = lastRet) != null && !r.isMatched())
-                lastPred = r;    // next lastPred is old lastRet
-            else if ((b = lastPred) == null || b.isMatched())
-                lastPred = null; // at start of list
-            else {
-                Node s, n;       // help with removal of lastPred.next
-                while ((s = b.next) != null &&
-                       s != b && s.isMatched() &&
-                       (n = s.next) != null && n != s)
-                    b.casNext(s, n);
+        @SuppressWarnings("unchecked")
+        private void advance(Node pred) {
+            for (Node p = (pred == null) ? head : pred.next, c = p;
+                 p != null; ) {
+                final Object item;
+                if ((item = p.item) != null && p.isData) {
+                    nextNode = p;
+                    nextItem = (E) item;
+                    if (c != p)
+                        tryCasSuccessor(pred, c, p);
+                    return;
+                }
+                else if (!p.isData && item == null)
+                    break;
+                if (c != p && !tryCasSuccessor(pred, c, c = p)) {
+                    pred = p;
+                    c = p = p.next;
+                }
+                else if (p == (p = p.next)) {
+                    pred = null;
+                    c = p = head;
+                }
             }
-
-            this.lastRet = prev;
-
-            for (Node p = prev, s, n;;) {
-                s = (p == null) ? head : p.next;
-                if (s == null)
-                    break;
-                else if (s == p) {
-                    p = null;
-                    continue;
-                }
-                Object item = s.item;
-                if (s.isData) {
-                    if (item != null && item != s) {
-                        @SuppressWarnings("unchecked") E itemE = (E) item;
-                        nextItem = itemE;
-                        nextNode = s;
-                        return;
-                    }
-                }
-                else if (item == null)
-                    break;
-                // assert s.isMatched();
-                if (p == null)
-                    p = s;
-                else if ((n = s.next) == null)
-                    break;
-                else if (s == n)
-                    p = null;
-                else
-                    p.casNext(s, n);
-            }
+            nextItem = null;
             nextNode = null;
-            nextItem = null;
         }
 
         Itr() {
@@ -999,25 +986,67 @@
         }
 
         public final E next() {
-            Node p = nextNode;
-            if (p == null) throw new NoSuchElementException();
+            final Node p;
+            if ((p = nextNode) == null) throw new NoSuchElementException();
             E e = nextItem;
-            advance(p);
+            advance(lastRet = p);
             return e;
         }
 
+        public void forEachRemaining(Consumer<? super E> action) {
+            Objects.requireNonNull(action);
+            Node q = null;
+            for (Node p; (p = nextNode) != null; advance(q = p))
+                action.accept(nextItem);
+            if (q != null)
+                lastRet = q;
+        }
+
         public final void remove() {
             final Node lastRet = this.lastRet;
             if (lastRet == null)
                 throw new IllegalStateException();
             this.lastRet = null;
-            if (lastRet.tryMatchData())
-                unsplice(lastPred, lastRet);
+            if (lastRet.item == null)   // already deleted?
+                return;
+            // Advance ancestor, collapsing intervening dead nodes
+            Node pred = ancestor;
+            for (Node p = (pred == null) ? head : pred.next, c = p, q;
+                 p != null; ) {
+                if (p == lastRet) {
+                    final Object item;
+                    if ((item = p.item) != null)
+                        p.tryMatch(item, null);
+                    if ((q = p.next) == null) q = p;
+                    if (c != q) tryCasSuccessor(pred, c, q);
+                    ancestor = pred;
+                    return;
+                }
+                final Object item; final boolean pAlive;
+                if (pAlive = ((item = p.item) != null && p.isData)) {
+                    // exceptionally, nothing to do
+                }
+                else if (!p.isData && item == null)
+                    break;
+                if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) {
+                    pred = p;
+                    c = p = p.next;
+                }
+                else if (p == (p = p.next)) {
+                    pred = null;
+                    c = p = head;
+                }
+            }
+            // traversal failed to find lastRet; must have been deleted;
+            // leave ancestor at original location to avoid overshoot;
+            // better luck next time!
+
+            // assert lastRet.isMatched();
         }
     }
 
     /** A customized variant of Spliterators.IteratorSpliterator */
-    final class LTQSpliterator<E> implements Spliterator<E> {
+    final class LTQSpliterator implements Spliterator<E> {
         static final int MAX_BATCH = 1 << 25;  // max batch array size;
         Node current;       // current node; null until initialized
         int batch;          // batch size for splits
@@ -1025,79 +1054,90 @@
         LTQSpliterator() {}
 
         public Spliterator<E> trySplit() {
-            Node p;
-            int b = batch;
-            int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
-            if (!exhausted &&
-                ((p = current) != null || (p = firstDataNode()) != null) &&
-                p.next != null) {
-                Object[] a = new Object[n];
-                int i = 0;
-                do {
-                    Object e = p.item;
-                    if (e != p && (a[i] = e) != null)
-                        ++i;
-                    if (p == (p = p.next))
-                        p = firstDataNode();
-                } while (p != null && i < n && p.isData);
-                if ((current = p) == null)
-                    exhausted = true;
-                if (i > 0) {
-                    batch = i;
-                    return Spliterators.spliterator
-                        (a, 0, i, (Spliterator.ORDERED |
-                                   Spliterator.NONNULL |
-                                   Spliterator.CONCURRENT));
+            Node p, q;
+            if ((p = current()) == null || (q = p.next) == null)
+                return null;
+            int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH);
+            Object[] a = null;
+            do {
+                final Object item = p.item;
+                if (p.isData) {
+                    if (item != null) {
+                        if (a == null)
+                            a = new Object[n];
+                        a[i++] = item;
+                    }
+                } else if (item == null) {
+                    p = null;
+                    break;
                 }
-            }
-            return null;
+                if (p == (p = q))
+                    p = firstDataNode();
+            } while (p != null && (q = p.next) != null && i < n);
+            setCurrent(p);
+            return (i == 0) ? null :
+                Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED |
+                                                   Spliterator.NONNULL |
+                                                   Spliterator.CONCURRENT));
         }
 
-        @SuppressWarnings("unchecked")
         public void forEachRemaining(Consumer<? super E> action) {
-            Node p;
-            if (action == null) throw new NullPointerException();
-            if (!exhausted &&
-                ((p = current) != null || (p = firstDataNode()) != null)) {
+            Objects.requireNonNull(action);
+            final Node p;
+            if ((p = current()) != null) {
+                current = null;
                 exhausted = true;
-                do {
-                    Object e = p.item;
-                    if (e != null && e != p)
-                        action.accept((E)e);
-                    if (p == (p = p.next))
-                        p = firstDataNode();
-                } while (p != null && p.isData);
+                forEachFrom(action, p);
             }
         }
 
         @SuppressWarnings("unchecked")
         public boolean tryAdvance(Consumer<? super E> action) {
+            Objects.requireNonNull(action);
             Node p;
-            if (action == null) throw new NullPointerException();
-            if (!exhausted &&
-                ((p = current) != null || (p = firstDataNode()) != null)) {
-                Object e;
+            if ((p = current()) != null) {
+                E e = null;
                 do {
-                    if ((e = p.item) == p)
-                        e = null;
+                    final Object item = p.item;
+                    final boolean isData = p.isData;
                     if (p == (p = p.next))
-                        p = firstDataNode();
-                } while (e == null && p != null && p.isData);
-                if ((current = p) == null)
-                    exhausted = true;
+                        p = head;
+                    if (isData) {
+                        if (item != null) {
+                            e = (E) item;
+                            break;
+                        }
+                    }
+                    else if (item == null)
+                        p = null;
+                } while (p != null);
+                setCurrent(p);
                 if (e != null) {
-                    action.accept((E)e);
+                    action.accept(e);
                     return true;
                 }
             }
             return false;
         }
 
+        private void setCurrent(Node p) {
+            if ((current = p) == null)
+                exhausted = true;
+        }
+
+        private Node current() {
+            Node p;
+            if ((p = current) == null && !exhausted)
+                setCurrent(p = firstDataNode());
+            return p;
+        }
+
         public long estimateSize() { return Long.MAX_VALUE; }
 
         public int characteristics() {
-            return Spliterator.ORDERED | Spliterator.NONNULL |
-                Spliterator.CONCURRENT;
+            return (Spliterator.ORDERED |
+                    Spliterator.NONNULL |
+                    Spliterator.CONCURRENT);
         }
     }
 
@@ -1118,7 +1158,7 @@
      * @since 1.8
      */
     public Spliterator<E> spliterator() {
-        return new LTQSpliterator<E>();
+        return new LTQSpliterator();
     }
 
     /* -------------- Removal methods -------------- */
@@ -1128,10 +1168,15 @@
      * the given predecessor.
      *
      * @param pred a node that was at one time known to be the
-     * predecessor of s, or null or s itself if s is/was at head
+     * predecessor of s
      * @param s the node to be unspliced
      */
     final void unsplice(Node pred, Node s) {
+        // assert pred != null;
+        // assert pred != s;
+        // assert s != null;
+        // assert s.isMatched();
+        // assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0;
         s.waiter = null; // disable signals
         /*
          * See above for rationale. Briefly: if pred still points to
@@ -1140,13 +1185,13 @@
          * nor s are head or offlist, add to sweepVotes, and if enough
          * votes have accumulated, sweep.
          */
-        if (pred != null && pred != s && pred.next == s) {
+        if (pred != null && pred.next == s) {
             Node n = s.next;
             if (n == null ||
                 (n != s && pred.casNext(s, n) && pred.isMatched())) {
                 for (;;) {               // check if at, or could be, head
                     Node h = head;
-                    if (h == pred || h == s || h == null)
+                    if (h == pred || h == s)
                         return;          // at head or list empty
                     if (!h.isMatched())
                         break;
@@ -1154,21 +1199,12 @@
                     if (hn == null)
                         return;          // now empty
                     if (hn != h && casHead(h, hn))
-                        h.forgetNext();  // advance head
+                        h.selfLink();  // advance head
                 }
-                if (pred.next != pred && s.next != s) { // recheck if offlist
-                    for (;;) {           // sweep now if enough votes
-                        int v = sweepVotes;
-                        if (v < SWEEP_THRESHOLD) {
-                            if (casSweepVotes(v, v + 1))
-                                break;
-                        }
-                        else if (casSweepVotes(v, 0)) {
-                            sweep();
-                            break;
-                        }
-                    }
-                }
+                // sweep every SWEEP_THRESHOLD votes
+                if (pred.next != pred && s.next != s // recheck if offlist
+                    && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
+                    sweep();
             }
         }
     }
@@ -1193,35 +1229,10 @@
     }
 
     /**
-     * Main implementation of remove(Object)
-     */
-    private boolean findAndRemove(Object e) {
-        if (e != null) {
-            for (Node pred = null, p = head; p != null; ) {
-                Object item = p.item;
-                if (p.isData) {
-                    if (item != null && item != p && e.equals(item) &&
-                        p.tryMatchData()) {
-                        unsplice(pred, p);
-                        return true;
-                    }
-                }
-                else if (item == null)
-                    break;
-                pred = p;
-                if ((p = p.next) == pred) { // stale
-                    pred = null;
-                    p = head;
-                }
-            }
-        }
-        return false;
-    }
-
-    /**
      * Creates an initially empty {@code LinkedTransferQueue}.
      */
     public LinkedTransferQueue() {
+        head = tail = new Node();
     }
 
     /**
@@ -1234,8 +1245,18 @@
      *         of its elements are null
      */
     public LinkedTransferQueue(Collection<? extends E> c) {
-        this();
-        addAll(c);
+        Node h = null, t = null;
+        for (E e : c) {
+            Node newNode = new Node(Objects.requireNonNull(e));
+            if (h == null)
+                h = t = newNode;
+            else
+                t.appendRelaxed(t = newNode);
+        }
+        if (h == null)
+            h = t = new Node();
+        head = h;
+        tail = t;
     }
 
     /**
@@ -1367,15 +1388,12 @@
      * @throws IllegalArgumentException {@inheritDoc}
      */
     public int drainTo(Collection<? super E> c) {
-        if (c == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(c);
         if (c == this)
             throw new IllegalArgumentException();
         int n = 0;
-        for (E e; (e = poll()) != null;) {
+        for (E e; (e = poll()) != null; n++)
             c.add(e);
-            ++n;
-        }
         return n;
     }
 
@@ -1384,15 +1402,12 @@
      * @throws IllegalArgumentException {@inheritDoc}
      */
     public int drainTo(Collection<? super E> c, int maxElements) {
-        if (c == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(c);
         if (c == this)
             throw new IllegalArgumentException();
         int n = 0;
-        for (E e; n < maxElements && (e = poll()) != null;) {
+        for (E e; n < maxElements && (e = poll()) != null; n++)
             c.add(e);
-            ++n;
-        }
         return n;
     }
 
@@ -1414,7 +1429,7 @@
             for (Node p = head; p != null;) {
                 Object item = p.item;
                 if (p.isData) {
-                    if (item != null && item != p) {
+                    if (item != null) {
                         @SuppressWarnings("unchecked") E e = (E) item;
                         return e;
                     }
@@ -1442,7 +1457,7 @@
             for (Node p = head; p != null;) {
                 Object item = p.item;
                 if (p.isData) {
-                    if (item != null && item != p)
+                    if (item != null)
                         break;
                 }
                 else if (item == null)
@@ -1486,7 +1501,31 @@
      * @return {@code true} if this queue changed as a result of the call
      */
     public boolean remove(Object o) {
-        return findAndRemove(o);
+        if (o == null) return false;
+        restartFromHead: for (;;) {
+            for (Node p = head, pred = null; p != null; ) {
+                Node q = p.next;
+                final Object item;
+                if ((item = p.item) != null) {
+                    if (p.isData) {
+                        if (o.equals(item) && p.tryMatch(item, null)) {
+                            skipDeadNodes(pred, p, p, q);
+                            return true;
+                        }
+                        pred = p; p = q; continue;
+                    }
+                }
+                else if (!p.isData)
+                    break;
+                for (Node c = p;; q = p.next) {
+                    if (q == null || !q.isMatched()) {
+                        pred = skipDeadNodes(pred, c, p, q); p = q; break;
+                    }
+                    if (p == (p = q)) continue restartFromHead;
+                }
+            }
+            return false;
+        }
     }
 
     /**
@@ -1498,18 +1537,29 @@
      * @return {@code true} if this queue contains the specified element
      */
     public boolean contains(Object o) {
-        if (o != null) {
-            for (Node p = head; p != null; p = succ(p)) {
-                Object item = p.item;
-                if (p.isData) {
-                    if (item != null && item != p && o.equals(item))
-                        return true;
+        if (o == null) return false;
+        restartFromHead: for (;;) {
+            for (Node p = head, pred = null; p != null; ) {
+                Node q = p.next;
+                final Object item;
+                if ((item = p.item) != null) {
+                    if (p.isData) {
+                        if (o.equals(item))
+                            return true;
+                        pred = p; p = q; continue;
+                    }
                 }
-                else if (item == null)
+                else if (!p.isData)
                     break;
+                for (Node c = p;; q = p.next) {
+                    if (q == null || !q.isMatched()) {
+                        pred = skipDeadNodes(pred, c, p, q); p = q; break;
+                    }
+                    if (p == (p = q)) continue restartFromHead;
+                }
             }
+            return false;
         }
-        return false;
     }
 
     /**
@@ -1550,21 +1600,136 @@
      */
     private void readObject(java.io.ObjectInputStream s)
         throws java.io.IOException, ClassNotFoundException {
-        s.defaultReadObject();
-        for (;;) {
+
+        // Read in elements until trailing null sentinel found
+        Node h = null, t = null;
+        for (Object item; (item = s.readObject()) != null; ) {
             @SuppressWarnings("unchecked")
-            E item = (E) s.readObject();
-            if (item == null)
+            Node newNode = new Node((E) item);
+            if (h == null)
+                h = t = newNode;
+            else
+                t.appendRelaxed(t = newNode);
+        }
+        if (h == null)
+            h = t = new Node();
+        head = h;
+        tail = t;
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean removeIf(Predicate<? super E> filter) {
+        Objects.requireNonNull(filter);
+        return bulkRemove(filter);
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean removeAll(Collection<?> c) {
+        Objects.requireNonNull(c);
+        return bulkRemove(e -> c.contains(e));
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean retainAll(Collection<?> c) {
+        Objects.requireNonNull(c);
+        return bulkRemove(e -> !c.contains(e));
+    }
+
+    public void clear() {
+        bulkRemove(e -> true);
+    }
+
+    /**
+     * Tolerate this many consecutive dead nodes before CAS-collapsing.
+     * Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element.
+     */
+    private static final int MAX_HOPS = 8;
+
+    /** Implementation of bulk remove methods. */
+    @SuppressWarnings("unchecked")
+    private boolean bulkRemove(Predicate<? super E> filter) {
+        boolean removed = false;
+        restartFromHead: for (;;) {
+            int hops = MAX_HOPS;
+            // c will be CASed to collapse intervening dead nodes between
+            // pred (or head if null) and p.
+            for (Node p = head, c = p, pred = null, q; p != null; p = q) {
+                q = p.next;
+                final Object item; boolean pAlive;
+                if (pAlive = ((item = p.item) != null && p.isData)) {
+                    if (filter.test((E) item)) {
+                        if (p.tryMatch(item, null))
+                            removed = true;
+                        pAlive = false;
+                    }
+                }
+                else if (!p.isData && item == null)
+                    break;
+                if (pAlive || q == null || --hops == 0) {
+                    // p might already be self-linked here, but if so:
+                    // - CASing head will surely fail
+                    // - CASing pred's next will be useless but harmless.
+                    if ((c != p && !tryCasSuccessor(pred, c, c = p))
+                        || pAlive) {
+                        // if CAS failed or alive, abandon old pred
+                        hops = MAX_HOPS;
+                        pred = p;
+                        c = q;
+                    }
+                } else if (p == q)
+                    continue restartFromHead;
+            }
+            return removed;
+        }
+    }
+
+    /**
+     * Runs action on each element found during a traversal starting at p.
+     * If p is null, the action is not run.
+     */
+    @SuppressWarnings("unchecked")
+    void forEachFrom(Consumer<? super E> action, Node p) {
+        for (Node pred = null; p != null; ) {
+            Node q = p.next;
+            final Object item;
+            if ((item = p.item) != null) {
+                if (p.isData) {
+                    action.accept((E) item);
+                    pred = p; p = q; continue;
+                }
+            }
+            else if (!p.isData)
                 break;
-            else
-                offer(item);
+            for (Node c = p;; q = p.next) {
+                if (q == null || !q.isMatched()) {
+                    pred = skipDeadNodes(pred, c, p, q); p = q; break;
+                }
+                if (p == (p = q)) { pred = null; p = head; break; }
+            }
         }
     }
 
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public void forEach(Consumer<? super E> action) {
+        Objects.requireNonNull(action);
+        forEachFrom(action, head);
+    }
+
     // VarHandle mechanics
     private static final VarHandle HEAD;
     private static final VarHandle TAIL;
     private static final VarHandle SWEEPVOTES;
+    static final VarHandle ITEM;
+    static final VarHandle NEXT;
+    static final VarHandle WAITER;
     static {
         try {
             MethodHandles.Lookup l = MethodHandles.lookup();
@@ -1574,6 +1739,9 @@
                                    Node.class);
             SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
                                          int.class);
+            ITEM = l.findVarHandle(Node.class, "item", Object.class);
+            NEXT = l.findVarHandle(Node.class, "next", Node.class);
+            WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
         } catch (ReflectiveOperationException e) {
             throw new Error(e);
         }