# HG changeset patch # User dl # Date 1486157099 28800 # Node ID 60e247b8d9a41ccadccaa083a20d2b617dc5b814 # Parent 65d31ea930aa960087cee23bcef37e3b9ec5f5b5 8169748: LinkedTransferQueue bulk remove is O(n^2) 8172023: Concurrent spliterators fail to handle exhaustion properly Reviewed-by: martin, psandoz, smarks diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java Fri Feb 03 13:24:59 2017 -0800 @@ -67,12 +67,12 @@ * asynchronous nature of these deques, determining the current number * of elements requires a traversal of the elements, and so may report * inaccurate results if this collection is modified during traversal. - * Additionally, the bulk operations {@code addAll}, - * {@code removeAll}, {@code retainAll}, {@code containsAll}, - * and {@code toArray} are not guaranteed - * to be performed atomically. For example, an iterator operating - * concurrently with an {@code addAll} operation might view only some - * of the added elements. + * + *

Bulk operations that add, remove, or examine multiple elements, + * such as {@link #addAll}, {@link #removeIf} or {@link #forEach}, + * are not guaranteed to be performed atomically. + * For example, a {@code forEach} traversal concurrent with an {@code + * addAll} operation might observe only some of the added elements. * *

This class and its iterator implement all of the optional * methods of the {@link Deque} and {@link Iterator} interfaces. @@ -683,8 +683,9 @@ */ final Node succ(Node p) { // TODO: should we skip deleted nodes here? - Node q = p.next; - return (p == q) ? first() : q; + if (p == (p = p.next)) + p = first(); + return p; } /** @@ -1416,65 +1417,55 @@ boolean exhausted; // true when no more nodes public Spliterator trySplit() { - Node p; - int b = batch; - int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; - if (!exhausted && - ((p = current) != null || (p = first()) != null)) { - if (p.item == null && p == (p = p.next)) - current = p = first(); - if (p != null && p.next != null) { - Object[] a = new Object[n]; - int i = 0; - do { - if ((a[i] = p.item) != null) - ++i; - if (p == (p = p.next)) - p = first(); - } while (p != null && i < n); - if ((current = p) == null) - exhausted = true; - if (i > 0) { - batch = i; - return Spliterators.spliterator - (a, 0, i, (Spliterator.ORDERED | - Spliterator.NONNULL | - Spliterator.CONCURRENT)); - } + Node p, q; + if ((p = current()) == null || (q = p.next) == null) + return null; + int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); + Object[] a = null; + do { + final E e; + if ((e = p.item) != null) { + if (a == null) + a = new Object[n]; + a[i++] = e; } - } - return null; + if (p == (p = q)) + p = first(); + } while (p != null && (q = p.next) != null && i < n); + setCurrent(p); + return (i == 0) ? null : + Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT)); } public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = first()) != null)) { + if ((p = current()) != null) { + current = null; exhausted = true; do { - E e = p.item; + final E e; + if ((e = p.item) != null) + action.accept(e); if (p == (p = p.next)) p = first(); - if (e != null) - action.accept(e); } while (p != null); } } public boolean tryAdvance(Consumer action) { + Objects.requireNonNull(action); Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = first()) != null)) { + if ((p = current()) != null) { E e; do { e = p.item; if (p == (p = p.next)) p = first(); } while (e == null && p != null); - if ((current = p) == null) - exhausted = true; + setCurrent(p); if (e != null) { action.accept(e); return true; @@ -1483,11 +1474,24 @@ return false; } + private void setCurrent(Node p) { + if ((current = p) == null) + exhausted = true; + } + + private Node current() { + Node p; + if ((p = current) == null && !exhausted) + setCurrent(p = first()); + return p; + } + public long estimateSize() { return Long.MAX_VALUE; } public int characteristics() { - return Spliterator.ORDERED | Spliterator.NONNULL | - Spliterator.CONCURRENT; + return (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT); } } diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Fri Feb 03 13:24:59 2017 -0800 @@ -81,12 +81,12 @@ * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements, and so may report * inaccurate results if this collection is modified during traversal. - * Additionally, the bulk operations {@code addAll}, - * {@code removeAll}, {@code retainAll}, {@code containsAll}, - * and {@code toArray} are not guaranteed - * to be performed atomically. For example, an iterator operating - * concurrently with an {@code addAll} operation might view only some - * of the added elements. + * + *

Bulk operations that add, remove, or examine multiple elements, + * such as {@link #addAll}, {@link #removeIf} or {@link #forEach}, + * are not guaranteed to be performed atomically. + * For example, a {@code forEach} traversal concurrent with an {@code + * addAll} operation might observe only some of the added elements. * *

This class and its iterator implement all of the optional * methods of the {@link Queue} and {@link Iterator} interfaces. @@ -184,16 +184,30 @@ static final class Node { volatile E item; volatile Node next; - } + + /** + * Constructs a node holding item. Uses relaxed write because + * item can only be seen after piggy-backing publication via CAS. + */ + Node(E item) { + ITEM.set(this, item); + } + + /** Constructs a dead dummy node. */ + Node() {} - /** - * Returns a new node holding item. Uses relaxed write because item - * can only be seen after piggy-backing publication via CAS. - */ - static Node newNode(E item) { - Node node = new Node(); - ITEM.set(node, item); - return node; + void appendRelaxed(Node next) { + // assert next != null; + // assert this.next == null; + NEXT.set(this, next); + } + + boolean casItem(E cmp, E val) { + // assert item == cmp || item == null; + // assert cmp != null; + // assert val == null; + return ITEM.compareAndSet(this, cmp, val); + } } /** @@ -220,7 +234,7 @@ * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! - * - tail.next may or may not be self-pointing to tail. + * - tail.next may or may not be self-linked. */ private transient volatile Node tail; @@ -228,7 +242,7 @@ * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() { - head = tail = newNode(null); + head = tail = new Node(); } /** @@ -243,16 +257,14 @@ public ConcurrentLinkedQueue(Collection c) { Node h = null, t = null; for (E e : c) { - Node newNode = newNode(Objects.requireNonNull(e)); + Node newNode = new Node(Objects.requireNonNull(e)); if (h == null) h = t = newNode; - else { - NEXT.set(t, newNode); - t = newNode; - } + else + t.appendRelaxed(t = newNode); } if (h == null) - h = t = newNode(null); + h = t = new Node(); head = h; tail = t; } @@ -287,14 +299,17 @@ * stale pointer that is now off the list. */ final Node succ(Node p) { - Node next = p.next; - return (p == next) ? head : next; + if (p == (p = p.next)) + p = head; + return p; } /** * Tries to CAS pred.next (or head, if pred is null) from c to p. + * Caller must ensure that we're not unlinking the trailing node. */ private boolean tryCasSuccessor(Node pred, Node c, Node p) { + // assert p != null; // assert c.item == null; // assert c != p; if (pred != null) @@ -307,6 +322,29 @@ } /** + * Collapse dead nodes between pred and q. + * @param pred the last known live node, or null if none + * @param c the first dead node + * @param p the last dead node + * @param q p.next: the next live node, or null if at end + * @return either old pred or p if pred dead or CAS failed + */ + private Node skipDeadNodes(Node pred, Node c, Node p, Node q) { + // assert pred != c; + // assert p != q; + // assert c.item == null; + // assert p.item == null; + if (q == null) { + // Never unlink trailing node. + if (c == p) return pred; + q = p; + } + return (tryCasSuccessor(pred, c, q) + && (pred == null || ITEM.get(pred) != null)) + ? pred : p; + } + + /** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * @@ -314,7 +352,7 @@ * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { - final Node newNode = newNode(Objects.requireNonNull(e)); + final Node newNode = new Node(Objects.requireNonNull(e)); for (Node t = tail, p = t;;) { Node q = p.next; @@ -346,8 +384,7 @@ restartFromHead: for (;;) { for (Node h = head, p = h, q;; p = q) { final E item; - if ((item = p.item) != null - && ITEM.compareAndSet(p, item, null)) { + if ((item = p.item) != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time @@ -451,19 +488,20 @@ public boolean contains(Object o) { if (o == null) return false; restartFromHead: for (;;) { - for (Node p = head, c = p, pred = null, q; p != null; p = q) { + for (Node p = head, pred = null; p != null; ) { + Node q = p.next; final E item; - if ((item = p.item) != null && o.equals(item)) - return true; - if (c != p && tryCasSuccessor(pred, c, p)) - c = p; - q = p.next; - if (item != null || c != p) { - pred = p; - c = q; + if ((item = p.item) != null) { + if (o.equals(item)) + return true; + pred = p; p = q; continue; } - else if (p == q) - continue restartFromHead; + for (Node c = p;; q = p.next) { + if (q == null || q.item != null) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) continue restartFromHead; + } } return false; } @@ -483,23 +521,22 @@ public boolean remove(Object o) { if (o == null) return false; restartFromHead: for (;;) { - for (Node p = head, c = p, pred = null, q; p != null; p = q) { + for (Node p = head, pred = null; p != null; ) { + Node q = p.next; final E item; - final boolean removed = - (item = p.item) != null - && o.equals(item) - && ITEM.compareAndSet(p, item, null); - if (c != p && tryCasSuccessor(pred, c, p)) - c = p; - if (removed) - return true; - q = p.next; - if (item != null || c != p) { - pred = p; - c = q; + if ((item = p.item) != null) { + if (o.equals(item) && p.casItem(item, null)) { + skipDeadNodes(pred, p, p, q); + return true; + } + pred = p; p = q; continue; } - else if (p == q) - continue restartFromHead; + for (Node c = p;; q = p.next) { + if (q == null || q.item != null) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) continue restartFromHead; + } } return false; } @@ -525,13 +562,11 @@ // Copy c into a private chain of Nodes Node beginningOfTheEnd = null, last = null; for (E e : c) { - Node newNode = newNode(Objects.requireNonNull(e)); + Node newNode = new Node(Objects.requireNonNull(e)); if (beginningOfTheEnd == null) beginningOfTheEnd = last = newNode; - else { - NEXT.set(last, newNode); - last = newNode; - } + else + last.appendRelaxed(last = newNode); } if (beginningOfTheEnd == null) return false; @@ -677,7 +712,7 @@ */ @SuppressWarnings("unchecked") public T[] toArray(T[] a) { - if (a == null) throw new NullPointerException(); + Objects.requireNonNull(a); return (T[]) toArrayInternal(a); } @@ -757,6 +792,8 @@ } } + // Default implementation of forEachRemaining is "good enough". + public void remove() { Node l = lastRet; if (l == null) throw new IllegalStateException(); @@ -806,16 +843,14 @@ Node h = null, t = null; for (Object item; (item = s.readObject()) != null; ) { @SuppressWarnings("unchecked") - Node newNode = newNode((E) item); + Node newNode = new Node((E) item); if (h == null) h = t = newNode; - else { - NEXT.set(t, newNode); - t = newNode; - } + else + t.appendRelaxed(t = newNode); } if (h == null) - h = t = newNode(null); + h = t = new Node(); head = h; tail = t; } @@ -828,62 +863,49 @@ boolean exhausted; // true when no more nodes public Spliterator trySplit() { - Node p; - int b = batch; - int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; - if (!exhausted && - ((p = current) != null || (p = first()) != null) && - p.next != null) { - Object[] a = new Object[n]; - int i = 0; - do { - if ((a[i] = p.item) != null) - ++i; - if (p == (p = p.next)) - p = first(); - } while (p != null && i < n); - if ((current = p) == null) - exhausted = true; - if (i > 0) { - batch = i; - return Spliterators.spliterator - (a, 0, i, (Spliterator.ORDERED | - Spliterator.NONNULL | - Spliterator.CONCURRENT)); + Node p, q; + if ((p = current()) == null || (q = p.next) == null) + return null; + int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); + Object[] a = null; + do { + final E e; + if ((e = p.item) != null) { + if (a == null) + a = new Object[n]; + a[i++] = e; } - } - return null; + if (p == (p = q)) + p = first(); + } while (p != null && (q = p.next) != null && i < n); + setCurrent(p); + return (i == 0) ? null : + Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT)); } public void forEachRemaining(Consumer action) { - Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = first()) != null)) { + Objects.requireNonNull(action); + final Node p; + if ((p = current()) != null) { + current = null; exhausted = true; - do { - E e = p.item; - if (p == (p = p.next)) - p = first(); - if (e != null) - action.accept(e); - } while (p != null); + forEachFrom(action, p); } } public boolean tryAdvance(Consumer action) { + Objects.requireNonNull(action); Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = first()) != null)) { + if ((p = current()) != null) { E e; do { e = p.item; if (p == (p = p.next)) p = first(); } while (e == null && p != null); - if ((current = p) == null) - exhausted = true; + setCurrent(p); if (e != null) { action.accept(e); return true; @@ -892,11 +914,24 @@ return false; } + private void setCurrent(Node p) { + if ((current = p) == null) + exhausted = true; + } + + private Node current() { + Node p; + if ((p = current) == null && !exhausted) + setCurrent(p = first()); + return p; + } + public long estimateSize() { return Long.MAX_VALUE; } public int characteristics() { - return Spliterator.ORDERED | Spliterator.NONNULL | - Spliterator.CONCURRENT; + return (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT); } } @@ -963,22 +998,22 @@ // c will be CASed to collapse intervening dead nodes between // pred (or head if null) and p. for (Node p = head, c = p, pred = null, q; p != null; p = q) { + q = p.next; final E item; boolean pAlive; if (pAlive = ((item = p.item) != null)) { if (filter.test(item)) { - if (ITEM.compareAndSet(p, item, null)) + if (p.casItem(item, null)) removed = true; pAlive = false; } } - if ((q = p.next) == null || pAlive || --hops == 0) { + if (pAlive || q == null || --hops == 0) { // p might already be self-linked here, but if so: // - CASing head will surely fail // - CASing pred's next will be useless but harmless. - if (c != p && tryCasSuccessor(pred, c, p)) - c = p; - // if c != p, CAS failed, so abandon old pred - if (pAlive || c != p) { + if ((c != p && !tryCasSuccessor(pred, c, c = p)) + || pAlive) { + // if CAS failed or alive, abandon old pred hops = MAX_HOPS; pred = p; c = q; @@ -991,34 +1026,39 @@ } /** + * Runs action on each element found during a traversal starting at p. + * If p is null, the action is not run. + */ + void forEachFrom(Consumer action, Node p) { + for (Node pred = null; p != null; ) { + Node q = p.next; + final E item; + if ((item = p.item) != null) { + action.accept(item); + pred = p; p = q; continue; + } + for (Node c = p;; q = p.next) { + if (q == null || q.item != null) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) { pred = null; p = head; break; } + } + } + } + + /** * @throws NullPointerException {@inheritDoc} */ public void forEach(Consumer action) { Objects.requireNonNull(action); - restartFromHead: for (;;) { - for (Node p = head, c = p, pred = null, q; p != null; p = q) { - final E item; - if ((item = p.item) != null) - action.accept(item); - if (c != p && tryCasSuccessor(pred, c, p)) - c = p; - q = p.next; - if (item != null || c != p) { - pred = p; - c = q; - } - else if (p == q) - continue restartFromHead; - } - return; - } + forEachFrom(action, head); } // VarHandle mechanics private static final VarHandle HEAD; private static final VarHandle TAIL; - private static final VarHandle ITEM; - private static final VarHandle NEXT; + static final VarHandle ITEM; + static final VarHandle NEXT; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java Fri Feb 03 13:24:59 2017 -0800 @@ -45,6 +45,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Predicate; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on @@ -63,9 +64,8 @@ * contains}, {@link #iterator iterator.remove()}, and the bulk * operations, all of which run in linear time. * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. + *

This class and its iterator implement all of the optional + * methods of the {@link Collection} and {@link Iterator} interfaces. * *

This class is a member of the * @@ -195,18 +195,7 @@ */ public LinkedBlockingDeque(Collection c) { this(Integer.MAX_VALUE); - final ReentrantLock lock = this.lock; - lock.lock(); // Never contended, but necessary for visibility - try { - for (E e : c) { - if (e == null) - throw new NullPointerException(); - if (!linkLast(new Node(e))) - throw new IllegalStateException("Deque full"); - } - } finally { - lock.unlock(); - } + addAll(c); } @@ -299,6 +288,7 @@ */ void unlink(Node x) { // assert lock.isHeldByCurrentThread(); + // assert x.item != null; Node p = x.prev; Node n = x.next; if (p == null) { @@ -834,46 +824,65 @@ } } - /* - * TODO: Add support for more efficient bulk operations. + /** + * Appends all of the elements in the specified collection to the end of + * this deque, in the order that they are returned by the specified + * collection's iterator. Attempts to {@code addAll} of a deque to + * itself result in {@code IllegalArgumentException}. * - * We don't want to acquire the lock for every iteration, but we - * also want other threads a chance to interact with the - * collection, especially when count is close to capacity. + * @param c the elements to be inserted into this deque + * @return {@code true} if this deque changed as a result of the call + * @throws NullPointerException if the specified collection or any + * of its elements are null + * @throws IllegalArgumentException if the collection is this deque + * @throws IllegalStateException if this deque is full + * @see #add(Object) */ + public boolean addAll(Collection c) { + if (c == this) + // As historically specified in AbstractQueue#addAll + throw new IllegalArgumentException(); -// /** -// * Adds all of the elements in the specified collection to this -// * queue. Attempts to addAll of a queue to itself result in -// * {@code IllegalArgumentException}. Further, the behavior of -// * this operation is undefined if the specified collection is -// * modified while the operation is in progress. -// * -// * @param c collection containing elements to be added to this queue -// * @return {@code true} if this queue changed as a result of the call -// * @throws ClassCastException {@inheritDoc} -// * @throws NullPointerException {@inheritDoc} -// * @throws IllegalArgumentException {@inheritDoc} -// * @throws IllegalStateException if this deque is full -// * @see #add(Object) -// */ -// public boolean addAll(Collection c) { -// if (c == null) -// throw new NullPointerException(); -// if (c == this) -// throw new IllegalArgumentException(); -// final ReentrantLock lock = this.lock; -// lock.lock(); -// try { -// boolean modified = false; -// for (E e : c) -// if (linkLast(e)) -// modified = true; -// return modified; -// } finally { -// lock.unlock(); -// } -// } + // Copy c into a private chain of Nodes + Node beg = null, end = null; + int n = 0; + for (E e : c) { + Objects.requireNonNull(e); + n++; + Node newNode = new Node(e); + if (beg == null) + beg = end = newNode; + else { + end.next = newNode; + newNode.prev = end; + end = newNode; + } + } + if (beg == null) + return false; + + // Atomically append the chain at the end + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count + n <= capacity) { + beg.prev = last; + if (first == null) + first = beg; + else + last.next = beg; + last = end; + count += n; + notEmpty.signalAll(); + return true; + } + } finally { + lock.unlock(); + } + // Fall back to historic non-atomic implementation, failing + // with IllegalStateException when the capacity is exceeded. + return super.addAll(c); + } /** * Returns an array containing all of the elements in this deque, in @@ -992,7 +1001,9 @@ * - (possibly multiple) interior removed nodes (p.item == null) */ Node succ(Node p) { - return (p == (p = p.next)) ? first : p; + if (p == (p = p.next)) + p = first; + return p; } /** @@ -1049,7 +1060,9 @@ abstract Node nextNode(Node n); private Node succ(Node p) { - return (p == (p = nextNode(p))) ? firstNode() : p; + if (p == (p = nextNode(p))) + p = firstNode(); + return p; } AbstractItr() { @@ -1096,7 +1109,7 @@ lastRet = p; next = null; final ReentrantLock lock = LinkedBlockingDeque.this.lock; - final int batchSize = 32; + final int batchSize = 64; Object[] es = null; int n, len = 1; do { @@ -1175,11 +1188,10 @@ public Spliterator trySplit() { Node h; - int b = batch; - int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && ((h = current) != null || (h = first) != null) && h.next != null) { + int n = batch = Math.min(batch + 1, MAX_BATCH); Object[] a = new Object[n]; final ReentrantLock lock = LinkedBlockingDeque.this.lock; int i = 0; @@ -1199,13 +1211,11 @@ } else if ((est -= i) < 0L) est = 0L; - if (i > 0) { - batch = i; + if (i > 0) return Spliterators.spliterator (a, 0, i, (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT)); - } } return null; } @@ -1223,7 +1233,8 @@ e = p.item; p = succ(p); } while (e == null && p != null); - exhausted = ((current = p) == null); + if ((current = p) == null) + exhausted = true; } finally { lock.unlock(); } @@ -1288,7 +1299,7 @@ // Extract batches of elements while holding the lock; then // run the action on the elements while not final ReentrantLock lock = this.lock; - final int batchSize = 32; // max number of elements per batch + final int batchSize = 64; // max number of elements per batch Object[] es = null; // container for batch of elements int n, len = 0; do { @@ -1315,6 +1326,83 @@ } /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeIf(Predicate filter) { + Objects.requireNonNull(filter); + return bulkRemove(filter); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> c.contains(e)); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean retainAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> !c.contains(e)); + } + + /** Implementation of bulk remove methods. */ + @SuppressWarnings("unchecked") + private boolean bulkRemove(Predicate filter) { + boolean removed = false; + Node p = null; + final ReentrantLock lock = this.lock; + Node[] nodes = null; + int n, len = 0; + do { + // 1. Extract batch of up to 64 elements while holding the lock. + long deathRow = 0; // "bitset" of size 64 + lock.lock(); + try { + if (nodes == null) { + if (p == null) p = first; + for (Node q = p; q != null; q = succ(q)) + if (q.item != null && ++len == 64) + break; + nodes = (Node[]) new Node[len]; + } + for (n = 0; p != null && n < len; p = succ(p)) + nodes[n++] = p; + } finally { + lock.unlock(); + } + + // 2. Run the filter on the elements while lock is free. + for (int i = 0; i < n; i++) { + final E e; + if ((e = nodes[i].item) != null && filter.test(e)) + deathRow |= 1L << i; + } + + // 3. Remove any filtered elements while holding the lock. + if (deathRow != 0) { + lock.lock(); + try { + for (int i = 0; i < n; i++) { + final Node q; + if ((deathRow & (1L << i)) != 0L + && (q = nodes[i]).item != null) { + unlink(q); + removed = true; + } + } + } finally { + lock.unlock(); + } + } + } while (n > 0 && p != null); + return removed; + } + + /** * Saves this deque to a stream (that is, serializes it). * * @param s the stream diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java Fri Feb 03 13:24:59 2017 -0800 @@ -46,6 +46,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Predicate; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on @@ -66,9 +67,8 @@ * dynamically created upon each insertion unless this would bring the * queue above capacity. * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. + *

This class and its iterator implement all of the optional + * methods of the {@link Collection} and {@link Iterator} interfaces. * *

This class is a member of the * @@ -507,17 +507,17 @@ } /** - * Unlinks interior Node p with predecessor trail. + * Unlinks interior Node p with predecessor pred. */ - void unlink(Node p, Node trail) { + void unlink(Node p, Node pred) { // assert putLock.isHeldByCurrentThread(); // assert takeLock.isHeldByCurrentThread(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; - trail.next = p.next; + pred.next = p.next; if (last == p) - last = trail; + last = pred; if (count.getAndDecrement() == capacity) notFull.signal(); } @@ -537,11 +537,11 @@ if (o == null) return false; fullyLock(); try { - for (Node trail = head, p = trail.next; + for (Node pred = head, p = pred.next; p != null; - trail = p, p = p.next) { + pred = p, p = p.next) { if (o.equals(p.item)) { - unlink(p, trail); + unlink(p, pred); return true; } } @@ -740,7 +740,9 @@ * - (possibly multiple) interior removed nodes (p.item == null) */ Node succ(Node p) { - return (p == (p = p.next)) ? head.next : p; + if (p == (p = p.next)) + p = head.next; + return p; } /** @@ -756,16 +758,18 @@ return new Itr(); } + /** + * Weakly-consistent iterator. + * + * Lazily updated ancestor field provides expected O(1) remove(), + * but still O(n) in the worst case, whenever the saved ancestor + * is concurrently deleted. + */ private class Itr implements Iterator { - /* - * Basic weakly-consistent iterator. At all times hold the next - * item to hand out so that if hasNext() reports true, we will - * still have it to return even if lost race with a take etc. - */ - - private Node next; - private E nextItem; + private Node next; // Node holding nextItem + private E nextItem; // next item to hand out private Node lastRet; + private Node ancestor; // Helps unlink lastRet on remove() Itr() { fullyLock(); @@ -807,7 +811,7 @@ if ((p = next) == null) return; lastRet = p; next = null; - final int batchSize = 32; + final int batchSize = 64; Object[] es = null; int n, len = 1; do { @@ -840,19 +844,17 @@ } public void remove() { - if (lastRet == null) + Node p = lastRet; + if (p == null) throw new IllegalStateException(); + lastRet = null; fullyLock(); try { - Node node = lastRet; - lastRet = null; - for (Node trail = head, p = trail.next; - p != null; - trail = p, p = p.next) { - if (p == node) { - unlink(p, trail); - break; - } + if (p.item != null) { + if (ancestor == null) + ancestor = head; + ancestor = findPred(p, ancestor); + unlink(p, ancestor); } } finally { fullyUnlock(); @@ -877,11 +879,10 @@ public Spliterator trySplit() { Node h; - int b = batch; - int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && ((h = current) != null || (h = head.next) != null) && h.next != null) { + int n = batch = Math.min(batch + 1, MAX_BATCH); Object[] a = new Object[n]; int i = 0; Node p = current; @@ -900,13 +901,11 @@ } else if ((est -= i) < 0L) est = 0L; - if (i > 0) { - batch = i; + if (i > 0) return Spliterators.spliterator (a, 0, i, (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT)); - } } return null; } @@ -923,7 +922,8 @@ e = p.item; p = succ(p); } while (e == null && p != null); - exhausted = ((current = p) == null); + if ((current = p) == null) + exhausted = true; } finally { fullyUnlock(); } @@ -987,7 +987,7 @@ void forEachFrom(Consumer action, Node p) { // Extract batches of elements while holding the lock; then // run the action on the elements while not - final int batchSize = 32; // max number of elements per batch + final int batchSize = 64; // max number of elements per batch Object[] es = null; // container for batch of elements int n, len = 0; do { @@ -1014,6 +1014,97 @@ } /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeIf(Predicate filter) { + Objects.requireNonNull(filter); + return bulkRemove(filter); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> c.contains(e)); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean retainAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> !c.contains(e)); + } + + /** + * Returns the predecessor of live node p, given a node that was + * once a live ancestor of p (or head); allows unlinking of p. + */ + Node findPred(Node p, Node ancestor) { + // assert p.item != null; + if (ancestor.item == null) + ancestor = head; + // Fails with NPE if precondition not satisfied + for (Node q; (q = ancestor.next) != p; ) + ancestor = q; + return ancestor; + } + + /** Implementation of bulk remove methods. */ + @SuppressWarnings("unchecked") + private boolean bulkRemove(Predicate filter) { + boolean removed = false; + Node p = null, ancestor = head; + Node[] nodes = null; + int n, len = 0; + do { + // 1. Extract batch of up to 64 elements while holding the lock. + long deathRow = 0; // "bitset" of size 64 + fullyLock(); + try { + if (nodes == null) { + if (p == null) p = head.next; + for (Node q = p; q != null; q = succ(q)) + if (q.item != null && ++len == 64) + break; + nodes = (Node[]) new Node[len]; + } + for (n = 0; p != null && n < len; p = succ(p)) + nodes[n++] = p; + } finally { + fullyUnlock(); + } + + // 2. Run the filter on the elements while lock is free. + for (int i = 0; i < n; i++) { + final E e; + if ((e = nodes[i].item) != null && filter.test(e)) + deathRow |= 1L << i; + } + + // 3. Remove any filtered elements while holding the lock. + if (deathRow != 0) { + fullyLock(); + try { + for (int i = 0; i < n; i++) { + final Node q; + if ((deathRow & (1L << i)) != 0L + && (q = nodes[i]).item != null) { + ancestor = findPred(q, ancestor); + unlink(q, ancestor); + removed = true; + } + } + } finally { + fullyUnlock(); + } + } + } while (n > 0 && p != null); + return removed; + } + + /** * Saves this queue to a stream (that is, serializes it). * * @param s the stream diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java Fri Feb 03 13:24:59 2017 -0800 @@ -42,11 +42,13 @@ import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Queue; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.function.Predicate; /** * An unbounded {@link TransferQueue} based on linked nodes. @@ -61,16 +63,15 @@ * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements, and so may report * inaccurate results if this collection is modified during traversal. - * Additionally, the bulk operations {@code addAll}, - * {@code removeAll}, {@code retainAll}, {@code containsAll}, - * and {@code toArray} are not guaranteed - * to be performed atomically. For example, an iterator operating - * concurrently with an {@code addAll} operation might view only some - * of the added elements. * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. + *

Bulk operations that add, remove, or examine multiple elements, + * such as {@link #addAll}, {@link #removeIf} or {@link #forEach}, + * are not guaranteed to be performed atomically. + * For example, a {@code forEach} traversal concurrent with an {@code + * addAll} operation might observe only some of the added elements. + * + *

This class and its iterator implement all of the optional + * methods of the {@link Collection} and {@link Iterator} interfaces. * *

Memory consistency effects: As with other concurrent * collections, actions in a thread prior to placing an object into a @@ -158,9 +159,8 @@ * correctly perform enqueue and dequeue operations by traversing * from a pointer to the initial node; CASing the item of the * first unmatched node on match and CASing the next field of the - * trailing node on appends. (Plus some special-casing when - * initially empty). While this would be a terrible idea in - * itself, it does have the benefit of not requiring ANY atomic + * trailing node on appends. While this would be a terrible idea + * in itself, it does have the benefit of not requiring ANY atomic * updates on head/tail fields. * * We introduce here an approach that lies between the extremes of @@ -196,15 +196,15 @@ * with a given probability per traversal step. * * In any strategy along these lines, because CASes updating - * fields may fail, the actual slack may exceed targeted - * slack. However, they may be retried at any time to maintain - * targets. Even when using very small slack values, this - * approach works well for dual queues because it allows all - * operations up to the point of matching or appending an item - * (hence potentially allowing progress by another thread) to be - * read-only, thus not introducing any further contention. As - * described below, we implement this by performing slack - * maintenance retries only after these points. + * fields may fail, the actual slack may exceed targeted slack. + * However, they may be retried at any time to maintain targets. + * Even when using very small slack values, this approach works + * well for dual queues because it allows all operations up to the + * point of matching or appending an item (hence potentially + * allowing progress by another thread) to be read-only, thus not + * introducing any further contention. As described below, we + * implement this by performing slack maintenance retries only + * after these points. * * As an accompaniment to such techniques, traversal overhead can * be further reduced without increasing contention of head @@ -223,7 +223,7 @@ * (Similar issues arise in non-GC environments.) To cope with * this in our implementation, upon CASing to advance the head * pointer, we set the "next" link of the previous head to point - * only to itself; thus limiting the length of connected dead lists. + * only to itself; thus limiting the length of chains of dead nodes. * (We also take similar care to wipe out possibly garbage * retaining values held in other Node fields.) However, doing so * adds some further complexity to traversal: If any "next" @@ -266,15 +266,6 @@ * interior nodes) except in the case of cancellation/removal (see * below). * - * We allow both the head and tail fields to be null before any - * nodes are enqueued; initializing upon first append. This - * simplifies some other logic, as well as providing more - * efficient explicit control paths instead of letting JVMs insert - * implicit NullPointerExceptions when they are null. While not - * currently fully implemented, we also leave open the possibility - * of re-nulling these fields when empty (which is complicated to - * arrange, for little benefit.) - * * All enqueue/dequeue operations are handled by the single method * "xfer" with parameters indicating whether to act as some form * of offer, put, poll, take, or transfer (each possibly with @@ -282,44 +273,40 @@ * method outweighs the code bulk and maintenance problems of * using separate methods for each case. * - * Operation consists of up to three phases. The first is - * implemented within method xfer, the second in tryAppend, and - * the third in method awaitMatch. + * Operation consists of up to two phases. The first is implemented + * in method xfer, the second in method awaitMatch. * - * 1. Try to match an existing node + * 1. Traverse until matching or appending (method xfer) * - * Starting at head, skip already-matched nodes until finding - * an unmatched node of opposite mode, if one exists, in which - * case matching it and returning, also if necessary updating - * head to one past the matched node (or the node itself if the - * list has no other unmatched nodes). If the CAS misses, then - * a loop retries advancing head by two steps until either - * success or the slack is at most two. By requiring that each - * attempt advances head by two (if applicable), we ensure that - * the slack does not grow without bound. Traversals also check - * if the initial head is now off-list, in which case they - * start at the new head. + * Conceptually, we simply traverse all nodes starting from head. + * If we encounter an unmatched node of opposite mode, we match + * it and return, also updating head (by at least 2 hops) to + * one past the matched node (or the node itself if it's the + * pinned trailing node). Traversals also check for the + * possibility of falling off-list, in which case they restart. * - * If no candidates are found and the call was untimed - * poll/offer, (argument "how" is NOW) return. - * - * 2. Try to append a new node (method tryAppend) + * If the trailing node of the list is reached, a match is not + * possible. If this call was untimed poll or tryTransfer + * (argument "how" is NOW), return empty-handed immediately. + * Else a new node is CAS-appended. On successful append, if + * this call was ASYNC (e.g. offer), an element was + * successfully added to the end of the queue and we return. * - * Starting at current tail pointer, find the actual last node - * and try to append a new node (or if head was null, establish - * the first node). Nodes can be appended only if their - * predecessors are either already matched or are of the same - * mode. If we detect otherwise, then a new node with opposite - * mode must have been appended during traversal, so we must - * restart at phase 1. The traversal and update steps are - * otherwise similar to phase 1: Retrying upon CAS misses and - * checking for staleness. In particular, if a self-link is - * encountered, then we can safely jump to a node on the list - * by continuing the traversal at current head. + * Of course, this naive traversal is O(n) when no match is + * possible. We optimize the traversal by maintaining a tail + * pointer, which is expected to be "near" the end of the list. + * It is only safe to fast-forward to tail (in the presence of + * arbitrary concurrent changes) if it is pointing to a node of + * the same mode, even if it is dead (in this case no preceding + * node could still be matchable by this traversal). If we + * need to restart due to falling off-list, we can again + * fast-forward to tail, but only if it has changed since the + * last traversal (else we might loop forever). If tail cannot + * be used, traversal starts at head (but in this case we + * expect to be able to match near head). As with head, we + * CAS-advance the tail pointer by at least two hops. * - * On successful append, if the call was ASYNC, return. - * - * 3. Await match or cancellation (method awaitMatch) + * 2. Await match or cancellation (method awaitMatch) * * Wait for another thread to match node; instead cancelling if * the current thread was interrupted or the wait timed out. On @@ -373,12 +360,12 @@ * from, the head of list. * * Without taking these into account, it would be possible for an - * unbounded number of supposedly removed nodes to remain - * reachable. Situations leading to such buildup are uncommon but - * can occur in practice; for example when a series of short timed - * calls to poll repeatedly time out but never otherwise fall off - * the list because of an untimed call to take at the front of the - * queue. + * unbounded number of supposedly removed nodes to remain reachable. + * Situations leading to such buildup are uncommon but can occur + * in practice; for example when a series of short timed calls to + * poll repeatedly time out at the trailing node but otherwise + * never fall off the list because of an untimed call to take() at + * the front of the queue. * * When these cases arise, rather than always retraversing the * entire list to find an actual predecessor to unlink (which @@ -391,10 +378,9 @@ * We perform sweeps by the thread hitting threshold (rather than * background threads or by spreading work to other threads) * because in the main contexts in which removal occurs, the - * caller is already timed-out, cancelled, or performing a - * potentially O(n) operation (e.g. remove(x)), none of which are - * time-critical enough to warrant the overhead that alternatives - * would impose on other threads. + * caller is timed-out or cancelled, which are not time-critical + * enough to warrant the overhead that alternatives would impose + * on other threads. * * Because the sweepVotes estimate is conservative, and because * nodes become unlinked "naturally" as they fall off the head of @@ -406,6 +392,13 @@ * quiescent queues. The value defined below was chosen * empirically to balance these under various timeout scenarios. * + * Because traversal operations on the linked list of nodes are a + * natural opportunity to sweep dead nodes, we generally do so, + * including all the operations that might remove elements as they + * traverse, such as removeIf and Iterator.remove. This largely + * eliminates long chains of dead interior nodes, except from + * cancelled or timed out blocking operations. + * * Note that we cannot self-link unlinked interior nodes during * sweeps. However, the associated garbage chains terminate when * some successor ultimately falls off the head of the list and is @@ -446,54 +439,71 @@ /** * Queue nodes. Uses Object, not E, for items to allow forgetting - * them after use. Relies heavily on VarHandles to minimize - * unnecessary ordering constraints: Writes that are intrinsically - * ordered wrt other accesses or CASes use simple relaxed forms. + * them after use. Writes that are intrinsically ordered wrt + * other accesses or CASes use simple relaxed forms. */ static final class Node { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; - volatile Thread waiter; // null until waiting + volatile Thread waiter; // null when not waiting for a match - // CAS methods for fields + /** + * Constructs a data node holding item if item is non-null, + * else a request node. Uses relaxed write because item can + * only be seen after piggy-backing publication via CAS. + */ + Node(Object item) { + ITEM.set(this, item); + isData = (item != null); + } + + /** Constructs a (matched data) dummy node. */ + Node() { + isData = true; + } + final boolean casNext(Node cmp, Node val) { + // assert val != null; return NEXT.compareAndSet(this, cmp, val); } final boolean casItem(Object cmp, Object val) { - // assert cmp == null || cmp.getClass() != Node.class; + // assert isData == (cmp != null); + // assert isData == (val == null); + // assert !(cmp instanceof Node); return ITEM.compareAndSet(this, cmp, val); } /** - * Constructs a new node. Uses relaxed write because item can - * only be seen after publication via casNext. - */ - Node(Object item, boolean isData) { - ITEM.set(this, item); // relaxed write - this.isData = isData; - } - - /** * Links node to itself to avoid garbage retention. Called * only after CASing head field, so uses relaxed write. */ - final void forgetNext() { - NEXT.set(this, this); + final void selfLink() { + // assert isMatched(); + NEXT.setRelease(this, this); + } + + final void appendRelaxed(Node next) { + // assert next != null; + // assert this.next == null; + NEXT.set(this, next); } /** - * Sets item to self and waiter to null, to avoid garbage - * retention after matching or cancelling. Uses relaxed writes - * because order is already constrained in the only calling - * contexts: item is forgotten only after volatile/atomic - * mechanics that extract items. Similarly, clearing waiter - * follows either CAS or return from park (if ever parked; - * else we don't care). + * Sets item (of a request node) to self and waiter to null, + * to avoid garbage retention after matching or cancelling. + * Uses relaxed writes because order is already constrained in + * the only calling contexts: item is forgotten only after + * volatile/atomic mechanics that extract items, and visitors + * of request nodes only ever check whether item is null. + * Similarly, clearing waiter follows either CAS or return + * from park (if ever parked; else we don't care). */ final void forgetContents() { - ITEM.set(this, this); + // assert isMatched(); + if (!isData) + ITEM.set(this, this); WAITER.set(this, null); } @@ -502,15 +512,16 @@ * case of artificial matches due to cancellation. */ final boolean isMatched() { - Object x = item; - return (x == this) || ((x == null) == isData); + return isData == (item == null); } - /** - * Returns true if this is an unmatched request node. - */ - final boolean isUnmatchedRequest() { - return !isData && item == null; + /** Tries to CAS-match this node; if successful, wakes waiter. */ + final boolean tryMatch(Object cmp, Object val) { + if (casItem(cmp, val)) { + LockSupport.unpark(waiter); + return true; + } + return false; } /** @@ -520,52 +531,46 @@ */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; - Object x; - return d != haveData && (x = item) != this && (x != null) == d; - } - - /** - * Tries to artificially match a data node -- used by remove. - */ - final boolean tryMatchData() { - // assert isData; - Object x = item; - if (x != null && x != this && casItem(x, null)) { - LockSupport.unpark(waiter); - return true; - } - return false; + return d != haveData && d != (item == null); } private static final long serialVersionUID = -3375979862319811754L; - - // VarHandle mechanics - private static final VarHandle ITEM; - private static final VarHandle NEXT; - private static final VarHandle WAITER; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - ITEM = l.findVarHandle(Node.class, "item", Object.class); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); - } catch (ReflectiveOperationException e) { - throw new Error(e); - } - } } - /** head of the queue; null until first enqueue */ + /** + * A node from which the first live (non-matched) node (if any) + * can be reached in O(1) time. + * Invariants: + * - all live nodes are reachable from head via .next + * - head != null + * - (tmp = head).next != tmp || tmp != head + * Non-invariants: + * - head may or may not be live + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + */ transient volatile Node head; - /** tail of the queue; null until first append */ + /** + * A node from which the last node on list (that is, the unique + * node with node.next == null) can be reached in O(1) time. + * Invariants: + * - the last node is always reachable from tail via .next + * - tail != null + * Non-invariants: + * - tail may or may not be live + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + * - tail.next may or may not be self-linked. + */ private transient volatile Node tail; - /** The number of apparent failures to unsplice removed nodes */ + /** The number of apparent failures to unsplice cancelled nodes */ private transient volatile int sweepVotes; - // CAS methods for fields private boolean casTail(Node cmp, Node val) { + // assert cmp != null; + // assert val != null; return TAIL.compareAndSet(this, cmp, val); } @@ -573,13 +578,71 @@ return HEAD.compareAndSet(this, cmp, val); } - private boolean casSweepVotes(int cmp, int val) { - return SWEEPVOTES.compareAndSet(this, cmp, val); + /** Atomic version of ++sweepVotes. */ + private int incSweepVotes() { + return (int) SWEEPVOTES.getAndAdd(this, 1) + 1; + } + + /** + * Tries to CAS pred.next (or head, if pred is null) from c to p. + * Caller must ensure that we're not unlinking the trailing node. + */ + private boolean tryCasSuccessor(Node pred, Node c, Node p) { + // assert p != null; + // assert c.isData != (c.item != null); + // assert c != p; + if (pred != null) + return pred.casNext(c, p); + if (casHead(c, p)) { + c.selfLink(); + return true; + } + return false; } - /* - * Possible values for "how" argument in xfer method. + /** + * Collapses dead (matched) nodes between pred and q. + * @param pred the last known live node, or null if none + * @param c the first dead node + * @param p the last dead node + * @param q p.next: the next live node, or null if at end + * @return pred if pred still alive and CAS succeeded; else p */ + private Node skipDeadNodes(Node pred, Node c, Node p, Node q) { + // assert pred != c; + // assert p != q; + // assert c.isMatched(); + // assert p.isMatched(); + if (q == null) { + // Never unlink trailing node. + if (c == p) return pred; + q = p; + } + return (tryCasSuccessor(pred, c, q) + && (pred == null || !pred.isMatched())) + ? pred : p; + } + + /** + * Collapses dead (matched) nodes from h (which was once head) to p. + * Caller ensures all nodes from h up to and including p are dead. + */ + private void skipDeadNodesNearHead(Node h, Node p) { + // assert h != null; + // assert h != p; + // assert p.isMatched(); + for (;;) { + final Node q; + if ((q = p.next) == null) break; + else if (!q.isMatched()) { p = q; break; } + else if (p == (p = q)) return; + } + if (casHead(h, p)) + h.selfLink(); + } + + /* Possible values for "how" argument in xfer method. */ + private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take @@ -595,84 +658,32 @@ * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ + @SuppressWarnings("unchecked") private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); - Node s = null; // the node to append, if needed - retry: - for (;;) { // restart on append race - - for (Node h = head, p = h; p != null;) { // find & match first node - boolean isData = p.isData; - Object item = p.item; - if (item != p && (item != null) == isData) { // unmatched - if (isData == haveData) // can't match - break; - if (p.casItem(item, e)) { // match - for (Node q = p; q != h;) { - Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null ? q : n)) { - h.forgetNext(); - break; - } // advance and retry - if ((h = head) == null || - (q = h.next) == null || !q.isMatched()) - break; // unless slack < 2 - } - LockSupport.unpark(p.waiter); - @SuppressWarnings("unchecked") E itemE = (E) item; - return itemE; + restart: for (Node s = null, t = null, h = null;;) { + for (Node p = (t != (t = tail) && t.isData == haveData) ? t + : (h = head);; ) { + final Node q; final Object item; + if (p.isData != haveData + && haveData == ((item = p.item) == null)) { + if (h == null) h = head; + if (p.tryMatch(item, e)) { + if (h != p) skipDeadNodesNearHead(h, p); + return (E) item; } } - Node n = p.next; - p = (p != n) ? n : (h = head); // Use head if p offlist - } - - if (how != NOW) { // No matches available - if (s == null) - s = new Node(e, haveData); - Node pred = tryAppend(s, haveData); - if (pred == null) - continue retry; // lost race vs opposite mode - if (how != ASYNC) - return awaitMatch(s, pred, e, (how == TIMED), nanos); - } - return e; // not waiting - } - } - - /** - * Tries to append node s as tail. - * - * @param s the node to append - * @param haveData true if appending in data mode - * @return null on failure due to losing race with append in - * different mode, else s's predecessor, or s itself if no - * predecessor - */ - private Node tryAppend(Node s, boolean haveData) { - for (Node t = tail, p = t;;) { // move p to last node and append - Node n, u; // temps for reads of next & tail - if (p == null && (p = head) == null) { - if (casHead(null, s)) - return s; // initialize - } - else if (p.cannotPrecede(haveData)) - return null; // lost race vs opposite mode - else if ((n = p.next) != null) // not last; keep traversing - p = p != t && t != (u = tail) ? (t = u) : // stale tail - (p != n) ? n : null; // restart if off list - else if (!p.casNext(null, s)) - p = p.next; // re-read on CAS failure - else { - if (p != t) { // update if slack now >= 2 - while ((tail != t || !casTail(t, s)) && - (t = tail) != null && - (s = t.next) != null && // advance and retry - (s = s.next) != null && s != t); + if ((q = p.next) == null) { + if (how == NOW) return e; + if (s == null) s = new Node(e); + if (!p.casNext(null, s)) continue; + if (p != t) casTail(t, s); + if (how == ASYNC) return e; + return awaitMatch(s, p, e, (how == TIMED), nanos); } - return p; + if (p == (p = q)) continue restart; } } } @@ -681,9 +692,9 @@ * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node - * @param pred the predecessor of s, or s itself if it has no - * predecessor, or null if unknown (the null case does not occur - * in any current calls but may in possible future extensions) + * @param pred the predecessor of s, or null if unknown (the null + * case does not occur in any current calls but may in possible + * future extensions) * @param e the comparison value for checking match * @param timed if true, wait only until timeout elapses * @param nanos timeout in nanosecs, used only if timed is true @@ -696,17 +707,20 @@ ThreadLocalRandom randomYields = null; // bound if needed for (;;) { - Object item = s.item; - if (item != e) { // matched + final Object item; + if ((item = s.item) != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage @SuppressWarnings("unchecked") E itemE = (E) item; return itemE; } else if (w.isInterrupted() || (timed && nanos <= 0L)) { - unsplice(pred, s); // try to unlink and cancel - if (s.casItem(e, s)) // return normally if lost CAS + // try to cancel and unlink + if (s.casItem(e, s.isData ? null : s)) { + unsplice(pred, s); return e; + } + // return normally if lost CAS } else if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) @@ -750,34 +764,33 @@ /* -------------- Traversal methods -------------- */ /** - * Returns the successor of p, or the head node if p.next has been - * linked to self, which will only be true if traversing with a - * stale pointer that is now off the list. - */ - final Node succ(Node p) { - Node next = p.next; - return (p == next) ? head : next; - } - - /** * Returns the first unmatched data node, or null if none. - * Callers must recheck if the returned node's item field is null - * or self-linked before using. + * Callers must recheck if the returned node is unmatched + * before using. */ final Node firstDataNode() { + Node first = null; restartFromHead: for (;;) { - for (Node p = head; p != null;) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) - return p; + Node h = head, p = h; + for (; p != null;) { + final Object item; + if ((item = p.item) != null) { + if (p.isData) { + first = p; + break; + } } - else if (item == null) + else if (!p.isData) break; - if (p == (p = p.next)) + final Node q; + if ((q = p.next) == null) + break; + if (p == (p = q)) continue restartFromHead; } - return null; + if (p != h && casHead(h, p)) + h.selfLink(); + return first; } } @@ -810,7 +823,7 @@ for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { - if (item != null && item != p) { + if (item != null) { if (a == null) a = new String[4]; else if (size == a.length) @@ -839,7 +852,7 @@ for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { - if (item != null && item != p) { + if (item != null) { if (x == null) x = new Object[4]; else if (size == x.length) @@ -918,76 +931,50 @@ */ @SuppressWarnings("unchecked") public T[] toArray(T[] a) { - if (a == null) throw new NullPointerException(); + Objects.requireNonNull(a); return (T[]) toArrayInternal(a); } + /** + * Weakly-consistent iterator. + * + * Lazily updated ancestor is expected to be amortized O(1) remove(), + * but O(n) in the worst case, when lastRet is concurrently deleted. + */ final class Itr implements Iterator { private Node nextNode; // next node to return item for private E nextItem; // the corresponding item private Node lastRet; // last returned node, to support remove - private Node lastPred; // predecessor to unlink lastRet + private Node ancestor; // Helps unlink lastRet on remove() /** - * Moves to next node after prev, or first node if prev null. + * Moves to next node after pred, or first node if pred null. */ - private void advance(Node prev) { - /* - * To track and avoid buildup of deleted nodes in the face - * of calls to both Queue.remove and Itr.remove, we must - * include variants of unsplice and sweep upon each - * advance: Upon Itr.remove, we may need to catch up links - * from lastPred, and upon other removes, we might need to - * skip ahead from stale nodes and unsplice deleted ones - * found while advancing. - */ - - Node r, b; // reset lastPred upon possible deletion of lastRet - if ((r = lastRet) != null && !r.isMatched()) - lastPred = r; // next lastPred is old lastRet - else if ((b = lastPred) == null || b.isMatched()) - lastPred = null; // at start of list - else { - Node s, n; // help with removal of lastPred.next - while ((s = b.next) != null && - s != b && s.isMatched() && - (n = s.next) != null && n != s) - b.casNext(s, n); + @SuppressWarnings("unchecked") + private void advance(Node pred) { + for (Node p = (pred == null) ? head : pred.next, c = p; + p != null; ) { + final Object item; + if ((item = p.item) != null && p.isData) { + nextNode = p; + nextItem = (E) item; + if (c != p) + tryCasSuccessor(pred, c, p); + return; + } + else if (!p.isData && item == null) + break; + if (c != p && !tryCasSuccessor(pred, c, c = p)) { + pred = p; + c = p = p.next; + } + else if (p == (p = p.next)) { + pred = null; + c = p = head; + } } - - this.lastRet = prev; - - for (Node p = prev, s, n;;) { - s = (p == null) ? head : p.next; - if (s == null) - break; - else if (s == p) { - p = null; - continue; - } - Object item = s.item; - if (s.isData) { - if (item != null && item != s) { - @SuppressWarnings("unchecked") E itemE = (E) item; - nextItem = itemE; - nextNode = s; - return; - } - } - else if (item == null) - break; - // assert s.isMatched(); - if (p == null) - p = s; - else if ((n = s.next) == null) - break; - else if (s == n) - p = null; - else - p.casNext(s, n); - } + nextItem = null; nextNode = null; - nextItem = null; } Itr() { @@ -999,25 +986,67 @@ } public final E next() { - Node p = nextNode; - if (p == null) throw new NoSuchElementException(); + final Node p; + if ((p = nextNode) == null) throw new NoSuchElementException(); E e = nextItem; - advance(p); + advance(lastRet = p); return e; } + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + Node q = null; + for (Node p; (p = nextNode) != null; advance(q = p)) + action.accept(nextItem); + if (q != null) + lastRet = q; + } + public final void remove() { final Node lastRet = this.lastRet; if (lastRet == null) throw new IllegalStateException(); this.lastRet = null; - if (lastRet.tryMatchData()) - unsplice(lastPred, lastRet); + if (lastRet.item == null) // already deleted? + return; + // Advance ancestor, collapsing intervening dead nodes + Node pred = ancestor; + for (Node p = (pred == null) ? head : pred.next, c = p, q; + p != null; ) { + if (p == lastRet) { + final Object item; + if ((item = p.item) != null) + p.tryMatch(item, null); + if ((q = p.next) == null) q = p; + if (c != q) tryCasSuccessor(pred, c, q); + ancestor = pred; + return; + } + final Object item; final boolean pAlive; + if (pAlive = ((item = p.item) != null && p.isData)) { + // exceptionally, nothing to do + } + else if (!p.isData && item == null) + break; + if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) { + pred = p; + c = p = p.next; + } + else if (p == (p = p.next)) { + pred = null; + c = p = head; + } + } + // traversal failed to find lastRet; must have been deleted; + // leave ancestor at original location to avoid overshoot; + // better luck next time! + + // assert lastRet.isMatched(); } } /** A customized variant of Spliterators.IteratorSpliterator */ - final class LTQSpliterator implements Spliterator { + final class LTQSpliterator implements Spliterator { static final int MAX_BATCH = 1 << 25; // max batch array size; Node current; // current node; null until initialized int batch; // batch size for splits @@ -1025,79 +1054,90 @@ LTQSpliterator() {} public Spliterator trySplit() { - Node p; - int b = batch; - int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; - if (!exhausted && - ((p = current) != null || (p = firstDataNode()) != null) && - p.next != null) { - Object[] a = new Object[n]; - int i = 0; - do { - Object e = p.item; - if (e != p && (a[i] = e) != null) - ++i; - if (p == (p = p.next)) - p = firstDataNode(); - } while (p != null && i < n && p.isData); - if ((current = p) == null) - exhausted = true; - if (i > 0) { - batch = i; - return Spliterators.spliterator - (a, 0, i, (Spliterator.ORDERED | - Spliterator.NONNULL | - Spliterator.CONCURRENT)); + Node p, q; + if ((p = current()) == null || (q = p.next) == null) + return null; + int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); + Object[] a = null; + do { + final Object item = p.item; + if (p.isData) { + if (item != null) { + if (a == null) + a = new Object[n]; + a[i++] = item; + } + } else if (item == null) { + p = null; + break; } - } - return null; + if (p == (p = q)) + p = firstDataNode(); + } while (p != null && (q = p.next) != null && i < n); + setCurrent(p); + return (i == 0) ? null : + Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT)); } - @SuppressWarnings("unchecked") public void forEachRemaining(Consumer action) { - Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = firstDataNode()) != null)) { + Objects.requireNonNull(action); + final Node p; + if ((p = current()) != null) { + current = null; exhausted = true; - do { - Object e = p.item; - if (e != null && e != p) - action.accept((E)e); - if (p == (p = p.next)) - p = firstDataNode(); - } while (p != null && p.isData); + forEachFrom(action, p); } } @SuppressWarnings("unchecked") public boolean tryAdvance(Consumer action) { + Objects.requireNonNull(action); Node p; - if (action == null) throw new NullPointerException(); - if (!exhausted && - ((p = current) != null || (p = firstDataNode()) != null)) { - Object e; + if ((p = current()) != null) { + E e = null; do { - if ((e = p.item) == p) - e = null; + final Object item = p.item; + final boolean isData = p.isData; if (p == (p = p.next)) - p = firstDataNode(); - } while (e == null && p != null && p.isData); - if ((current = p) == null) - exhausted = true; + p = head; + if (isData) { + if (item != null) { + e = (E) item; + break; + } + } + else if (item == null) + p = null; + } while (p != null); + setCurrent(p); if (e != null) { - action.accept((E)e); + action.accept(e); return true; } } return false; } + private void setCurrent(Node p) { + if ((current = p) == null) + exhausted = true; + } + + private Node current() { + Node p; + if ((p = current) == null && !exhausted) + setCurrent(p = firstDataNode()); + return p; + } + public long estimateSize() { return Long.MAX_VALUE; } public int characteristics() { - return Spliterator.ORDERED | Spliterator.NONNULL | - Spliterator.CONCURRENT; + return (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT); } } @@ -1118,7 +1158,7 @@ * @since 1.8 */ public Spliterator spliterator() { - return new LTQSpliterator(); + return new LTQSpliterator(); } /* -------------- Removal methods -------------- */ @@ -1128,10 +1168,15 @@ * the given predecessor. * * @param pred a node that was at one time known to be the - * predecessor of s, or null or s itself if s is/was at head + * predecessor of s * @param s the node to be unspliced */ final void unsplice(Node pred, Node s) { + // assert pred != null; + // assert pred != s; + // assert s != null; + // assert s.isMatched(); + // assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0; s.waiter = null; // disable signals /* * See above for rationale. Briefly: if pred still points to @@ -1140,13 +1185,13 @@ * nor s are head or offlist, add to sweepVotes, and if enough * votes have accumulated, sweep. */ - if (pred != null && pred != s && pred.next == s) { + if (pred != null && pred.next == s) { Node n = s.next; if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { for (;;) { // check if at, or could be, head Node h = head; - if (h == pred || h == s || h == null) + if (h == pred || h == s) return; // at head or list empty if (!h.isMatched()) break; @@ -1154,21 +1199,12 @@ if (hn == null) return; // now empty if (hn != h && casHead(h, hn)) - h.forgetNext(); // advance head + h.selfLink(); // advance head } - if (pred.next != pred && s.next != s) { // recheck if offlist - for (;;) { // sweep now if enough votes - int v = sweepVotes; - if (v < SWEEP_THRESHOLD) { - if (casSweepVotes(v, v + 1)) - break; - } - else if (casSweepVotes(v, 0)) { - sweep(); - break; - } - } - } + // sweep every SWEEP_THRESHOLD votes + if (pred.next != pred && s.next != s // recheck if offlist + && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0) + sweep(); } } } @@ -1193,35 +1229,10 @@ } /** - * Main implementation of remove(Object) - */ - private boolean findAndRemove(Object e) { - if (e != null) { - for (Node pred = null, p = head; p != null; ) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p && e.equals(item) && - p.tryMatchData()) { - unsplice(pred, p); - return true; - } - } - else if (item == null) - break; - pred = p; - if ((p = p.next) == pred) { // stale - pred = null; - p = head; - } - } - } - return false; - } - - /** * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { + head = tail = new Node(); } /** @@ -1234,8 +1245,18 @@ * of its elements are null */ public LinkedTransferQueue(Collection c) { - this(); - addAll(c); + Node h = null, t = null; + for (E e : c) { + Node newNode = new Node(Objects.requireNonNull(e)); + if (h == null) + h = t = newNode; + else + t.appendRelaxed(t = newNode); + } + if (h == null) + h = t = new Node(); + head = h; + tail = t; } /** @@ -1367,15 +1388,12 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); + Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); int n = 0; - for (E e; (e = poll()) != null;) { + for (E e; (e = poll()) != null; n++) c.add(e); - ++n; - } return n; } @@ -1384,15 +1402,12 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c, int maxElements) { - if (c == null) - throw new NullPointerException(); + Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); int n = 0; - for (E e; n < maxElements && (e = poll()) != null;) { + for (E e; n < maxElements && (e = poll()) != null; n++) c.add(e); - ++n; - } return n; } @@ -1414,7 +1429,7 @@ for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { - if (item != null && item != p) { + if (item != null) { @SuppressWarnings("unchecked") E e = (E) item; return e; } @@ -1442,7 +1457,7 @@ for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { - if (item != null && item != p) + if (item != null) break; } else if (item == null) @@ -1486,7 +1501,31 @@ * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { - return findAndRemove(o); + if (o == null) return false; + restartFromHead: for (;;) { + for (Node p = head, pred = null; p != null; ) { + Node q = p.next; + final Object item; + if ((item = p.item) != null) { + if (p.isData) { + if (o.equals(item) && p.tryMatch(item, null)) { + skipDeadNodes(pred, p, p, q); + return true; + } + pred = p; p = q; continue; + } + } + else if (!p.isData) + break; + for (Node c = p;; q = p.next) { + if (q == null || !q.isMatched()) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) continue restartFromHead; + } + } + return false; + } } /** @@ -1498,18 +1537,29 @@ * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { - if (o != null) { - for (Node p = head; p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p && o.equals(item)) - return true; + if (o == null) return false; + restartFromHead: for (;;) { + for (Node p = head, pred = null; p != null; ) { + Node q = p.next; + final Object item; + if ((item = p.item) != null) { + if (p.isData) { + if (o.equals(item)) + return true; + pred = p; p = q; continue; + } } - else if (item == null) + else if (!p.isData) break; + for (Node c = p;; q = p.next) { + if (q == null || !q.isMatched()) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) continue restartFromHead; + } } + return false; } - return false; } /** @@ -1550,21 +1600,136 @@ */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - for (;;) { + + // Read in elements until trailing null sentinel found + Node h = null, t = null; + for (Object item; (item = s.readObject()) != null; ) { @SuppressWarnings("unchecked") - E item = (E) s.readObject(); - if (item == null) + Node newNode = new Node((E) item); + if (h == null) + h = t = newNode; + else + t.appendRelaxed(t = newNode); + } + if (h == null) + h = t = new Node(); + head = h; + tail = t; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeIf(Predicate filter) { + Objects.requireNonNull(filter); + return bulkRemove(filter); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean removeAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> c.contains(e)); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean retainAll(Collection c) { + Objects.requireNonNull(c); + return bulkRemove(e -> !c.contains(e)); + } + + public void clear() { + bulkRemove(e -> true); + } + + /** + * Tolerate this many consecutive dead nodes before CAS-collapsing. + * Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element. + */ + private static final int MAX_HOPS = 8; + + /** Implementation of bulk remove methods. */ + @SuppressWarnings("unchecked") + private boolean bulkRemove(Predicate filter) { + boolean removed = false; + restartFromHead: for (;;) { + int hops = MAX_HOPS; + // c will be CASed to collapse intervening dead nodes between + // pred (or head if null) and p. + for (Node p = head, c = p, pred = null, q; p != null; p = q) { + q = p.next; + final Object item; boolean pAlive; + if (pAlive = ((item = p.item) != null && p.isData)) { + if (filter.test((E) item)) { + if (p.tryMatch(item, null)) + removed = true; + pAlive = false; + } + } + else if (!p.isData && item == null) + break; + if (pAlive || q == null || --hops == 0) { + // p might already be self-linked here, but if so: + // - CASing head will surely fail + // - CASing pred's next will be useless but harmless. + if ((c != p && !tryCasSuccessor(pred, c, c = p)) + || pAlive) { + // if CAS failed or alive, abandon old pred + hops = MAX_HOPS; + pred = p; + c = q; + } + } else if (p == q) + continue restartFromHead; + } + return removed; + } + } + + /** + * Runs action on each element found during a traversal starting at p. + * If p is null, the action is not run. + */ + @SuppressWarnings("unchecked") + void forEachFrom(Consumer action, Node p) { + for (Node pred = null; p != null; ) { + Node q = p.next; + final Object item; + if ((item = p.item) != null) { + if (p.isData) { + action.accept((E) item); + pred = p; p = q; continue; + } + } + else if (!p.isData) break; - else - offer(item); + for (Node c = p;; q = p.next) { + if (q == null || !q.isMatched()) { + pred = skipDeadNodes(pred, c, p, q); p = q; break; + } + if (p == (p = q)) { pred = null; p = head; break; } + } } } + /** + * @throws NullPointerException {@inheritDoc} + */ + public void forEach(Consumer action) { + Objects.requireNonNull(action); + forEachFrom(action, head); + } + // VarHandle mechanics private static final VarHandle HEAD; private static final VarHandle TAIL; private static final VarHandle SWEEPVOTES; + static final VarHandle ITEM; + static final VarHandle NEXT; + static final VarHandle WAITER; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); @@ -1574,6 +1739,9 @@ Node.class); SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes", int.class); + ITEM = l.findVarHandle(Node.class, "item", Object.class); + NEXT = l.findVarHandle(Node.class, "next", Node.class); + WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); } catch (ReflectiveOperationException e) { throw new Error(e); } diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/src/java.base/share/classes/java/util/concurrent/PriorityBlockingQueue.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/PriorityBlockingQueue.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/PriorityBlockingQueue.java Fri Feb 03 13:24:59 2017 -0800 @@ -43,6 +43,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.PriorityQueue; import java.util.Queue; import java.util.SortedSet; @@ -62,16 +63,15 @@ * non-comparable objects (doing so results in * {@code ClassCastException}). * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. The Iterator provided in method {@link - * #iterator()} and the Spliterator provided in method {@link #spliterator()} - * are not guaranteed to traverse the elements of - * the PriorityBlockingQueue in any particular order. If you need - * ordered traversal, consider using - * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} - * can be used to remove some or all elements in priority - * order and place them in another collection. + *

This class and its iterator implement all of the optional + * methods of the {@link Collection} and {@link Iterator} interfaces. + * The Iterator provided in method {@link #iterator()} and the + * Spliterator provided in method {@link #spliterator()} are not + * guaranteed to traverse the elements of the PriorityBlockingQueue in + * any particular order. If you need ordered traversal, consider using + * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can + * be used to remove some or all elements in priority order and + * place them in another collection. * *

Operations on this class make no guarantees about the ordering * of elements with equal priority. If you need to enforce an @@ -437,15 +437,14 @@ */ private void heapify() { Object[] array = queue; - int n = size; - int half = (n >>> 1) - 1; + int n = size, i = (n >>> 1) - 1; Comparator cmp = comparator; if (cmp == null) { - for (int i = half; i >= 0; i--) + for (; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { - for (int i = half; i >= 0; i--) + for (; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } } @@ -730,8 +729,7 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c, int maxElements) { - if (c == null) - throw new NullPointerException(); + Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) @@ -935,21 +933,22 @@ * Immutable snapshot spliterator that binds to elements "late". */ final class PBQSpliterator implements Spliterator { - Object[] array; + Object[] array; // null until late-bound-initialized int index; int fence; + PBQSpliterator() {} + PBQSpliterator(Object[] array, int index, int fence) { this.array = array; this.index = index; this.fence = fence; } - final int getFence() { - int hi; - if ((hi = fence) < 0) - hi = fence = (array = toArray()).length; - return hi; + private int getFence() { + if (array == null) + fence = (array = toArray()).length; + return fence; } public PBQSpliterator trySplit() { @@ -958,25 +957,19 @@ new PBQSpliterator(array, lo, index = mid); } - @SuppressWarnings("unchecked") public void forEachRemaining(Consumer action) { - Object[] a; int i, hi; // hoist accesses and checks from loop - if (action == null) - throw new NullPointerException(); - if ((a = array) == null) - fence = (a = toArray()).length; - if ((hi = fence) <= a.length && - (i = index) >= 0 && i < (index = hi)) { - do { action.accept((E)a[i]); } while (++i < hi); - } + Objects.requireNonNull(action); + final int hi = getFence(), lo = index; + final Object[] a = array; + index = hi; // ensure exhaustion + for (int i = lo; i < hi; i++) + action.accept((E) a[i]); } public boolean tryAdvance(Consumer action) { - if (action == null) - throw new NullPointerException(); + Objects.requireNonNull(action); if (getFence() > index && index >= 0) { - @SuppressWarnings("unchecked") E e = (E) array[index++]; - action.accept(e); + action.accept((E) array[index++]); return true; } return false; @@ -985,7 +978,9 @@ public long estimateSize() { return getFence() - index; } public int characteristics() { - return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED; + return (Spliterator.NONNULL | + Spliterator.SIZED | + Spliterator.SUBSIZED); } } @@ -1007,7 +1002,7 @@ * @since 1.8 */ public Spliterator spliterator() { - return new PBQSpliterator(null, 0, -1); + return new PBQSpliterator(); } // VarHandle mechanics diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/test/java/util/Collection/RemoveMicroBenchmark.java --- a/jdk/test/java/util/Collection/RemoveMicroBenchmark.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/test/java/util/Collection/RemoveMicroBenchmark.java Fri Feb 03 13:24:59 2017 -0800 @@ -254,7 +254,7 @@ // "iterations=%d size=%d, warmup=%1g, filter=\"%s\"%n", // iterations, size, warmupSeconds, filter); - final ArrayList al = new ArrayList(size); + final ArrayList al = new ArrayList<>(size); // Populate collections with random data final ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -333,7 +333,7 @@ Supplier> supplier, ArrayList al) { return List.of( - new Job(description + " .removeIf") { + new Job(description + " removeIf") { public void work() throws Throwable { Collection x = supplier.get(); int[] sum = new int[1]; @@ -342,7 +342,21 @@ x.addAll(al); x.removeIf(n -> { sum[0] += n; return true; }); check.sum(sum[0]);}}}, - new Job(description + " .removeAll") { + new Job(description + " removeIf rnd-two-pass") { + public void work() throws Throwable { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + Collection x = supplier.get(); + int[] sum = new int[1]; + for (int i = 0; i < iterations; i++) { + sum[0] = 0; + x.addAll(al); + x.removeIf(n -> { + boolean b = rnd.nextBoolean(); + if (b) sum[0] += n; + return b; }); + x.removeIf(n -> { sum[0] += n; return true; }); + check.sum(sum[0]);}}}, + new Job(description + " removeAll") { public void work() throws Throwable { Collection x = supplier.get(); int[] sum = new int[1]; @@ -352,7 +366,7 @@ x.addAll(al); x.removeAll(universe); check.sum(sum[0]);}}}, - new Job(description + " .retainAll") { + new Job(description + " retainAll") { public void work() throws Throwable { Collection x = supplier.get(); int[] sum = new int[1]; @@ -375,6 +389,28 @@ it.remove(); } check.sum(sum[0]);}}}, + new Job(description + " Iterator.remove-rnd-two-pass") { + public void work() throws Throwable { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + Collection x = supplier.get(); + int[] sum = new int[1]; + for (int i = 0; i < iterations; i++) { + sum[0] = 0; + x.addAll(al); + for (Iterator it = x.iterator(); + it.hasNext(); ) { + Integer e = it.next(); + if (rnd.nextBoolean()) { + sum[0] += e; + it.remove(); + } + } + for (Iterator it = x.iterator(); + it.hasNext(); ) { + sum[0] += it.next(); + it.remove(); + } + check.sum(sum[0]);}}}, new Job(description + " clear") { public void work() throws Throwable { Collection x = supplier.get(); diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/test/java/util/concurrent/ConcurrentLinkedQueue/WhiteBox.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/ConcurrentLinkedQueue/WhiteBox.java Fri Feb 03 13:24:59 2017 -0800 @@ -0,0 +1,355 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Martin Buchholz with assistance from members of JCP + * JSR-166 Expert Group and released to the public domain, as + * explained at http://creativecommons.org/publicdomain/zero/1.0/ + */ + +/* + * @test + * @modules java.base/java.util.concurrent:open + * @run testng WhiteBox + * @summary White box tests of implementation details + */ + +import static org.testng.Assert.*; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; +import static java.util.stream.Collectors.toList; +import java.util.function.Consumer; +import java.util.function.Function; + +@Test +public class WhiteBox { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final VarHandle HEAD, TAIL, ITEM, NEXT; + + WhiteBox() throws ReflectiveOperationException { + Class qClass = ConcurrentLinkedQueue.class; + Class nodeClass = Class.forName(qClass.getName() + "$Node"); + MethodHandles.Lookup lookup + = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup()); + HEAD = lookup.findVarHandle(qClass, "head", nodeClass); + TAIL = lookup.findVarHandle(qClass, "tail", nodeClass); + NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass); + ITEM = lookup.findVarHandle(nodeClass, "item", Object.class); + } + + Object head(ConcurrentLinkedQueue q) { return HEAD.getVolatile(q); } + Object tail(ConcurrentLinkedQueue q) { return TAIL.getVolatile(q); } + Object item(Object node) { return ITEM.getVolatile(node); } + Object next(Object node) { return NEXT.getVolatile(node); } + + int nodeCount(ConcurrentLinkedQueue q) { + int i = 0; + for (Object p = head(q); p != null; ) { + i++; + if (p == (p = next(p))) p = head(q); + } + return i; + } + + void assertIsSelfLinked(Object node) { + assertSame(next(node), node); + assertNull(item(node)); + } + + void assertIsNotSelfLinked(Object node) { + assertNotSame(node, next(node)); + } + + @Test + public void addRemove() { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + assertInvariants(q); + assertNull(item(head(q))); + assertEquals(nodeCount(q), 1); + q.add(1); + assertEquals(nodeCount(q), 2); + assertInvariants(q); + q.remove(1); + assertEquals(nodeCount(q), 1); + assertInvariants(q); + } + + /** + * Traversal actions that visit every node and do nothing, but + * have side effect of squeezing out dead nodes. + */ + @DataProvider + public Object[][] traversalActions() { + return List.>of( + q -> q.forEach(e -> {}), + q -> assertFalse(q.contains(new Object())), + q -> assertFalse(q.remove(new Object())), + q -> q.spliterator().forEachRemaining(e -> {}), + q -> q.stream().collect(toList()), + q -> assertFalse(q.removeIf(e -> false)), + q -> assertFalse(q.removeAll(List.of()))) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseNodes( + Consumer traversalAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + Object oldHead; + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + assertInvariants(q); + assertEquals(nodeCount(q), n + 1); + oldHead = head(q); + traversalAction.accept(q); // collapses head node + assertIsSelfLinked(oldHead); + assertInvariants(q); + assertEquals(nodeCount(q), n); + // Iterator.remove does not currently try to collapse dead nodes + for (Iterator it = q.iterator(); it.hasNext(); ) { + it.next(); + it.remove(); + } + assertEquals(nodeCount(q), n); + assertInvariants(q); + oldHead = head(q); + traversalAction.accept(q); // collapses all nodes + if (n > 1) assertIsSelfLinked(oldHead); + assertEquals(nodeCount(q), 1); + assertInvariants(q); + + for (int i = 0; i < n + 1; i++) q.add(i); + assertEquals(nodeCount(q), n + 2); + oldHead = head(q); + assertEquals(0, q.poll()); // 2 leading nodes collapsed + assertIsSelfLinked(oldHead); + assertEquals(nodeCount(q), n); + assertTrue(q.remove(n)); + assertEquals(nodeCount(q), n); + traversalAction.accept(q); // trailing node is never collapsed + } + + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseLeadingNodes( + Consumer traversalAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + Object oldHead; + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + assertEquals(nodeCount(q), n + 1); + oldHead = head(q); + traversalAction.accept(q); + assertInvariants(q); + assertEquals(nodeCount(q), n); + assertIsSelfLinked(oldHead); + } + + @Test(dataProvider = "traversalActions") + public void traversalOperationsDoNotSelfLinkInteriorNodes( + Consumer traversalAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + int c; + int n = 3 + rnd.nextInt(3); + for (int i = 0; i < n; i++) q.add(i); + Object oneNode; + for (oneNode = head(q); + ! (item(oneNode) != null && item(oneNode).equals(1)); + oneNode = next(oneNode)) + ; + Object next = next(oneNode); + c = nodeCount(q); + for (Iterator it = q.iterator(); it.hasNext(); ) + if (it.next().equals(1)) it.remove(); + assertEquals(nodeCount(q), c - 1); // iterator detached head! + assertNull(item(oneNode)); + assertSame(next, next(oneNode)); + assertInvariants(q); + c = nodeCount(q); + traversalAction.accept(q); + assertEquals(nodeCount(q), c - 1); + assertSame(next, next(oneNode)); // un-linked, but not self-linked + } + + /** + * Checks that traversal operations collapse a random pattern of + * dead nodes as could normally only occur with a race. + */ + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseRandomNodes( + Consumer traversalAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + int n = rnd.nextInt(6); + for (int i = 0; i < n; i++) q.add(i); + ArrayList nulledOut = new ArrayList(); + for (Object p = head(q); p != null; p = next(p)) + if (item(p) != null && rnd.nextBoolean()) { + nulledOut.add(item(p)); + ITEM.setVolatile(p, null); + } + traversalAction.accept(q); + int c = nodeCount(q); + assertEquals(q.size(), c - (q.contains(n - 1) ? 0 : 1)); + for (int i = 0; i < n; i++) + assertTrue(nulledOut.contains(i) ^ q.contains(i)); + } + + /** + * Traversal actions that remove every element, and are also + * expected to squeeze out dead nodes. + */ + @DataProvider + public Object[][] bulkRemovalActions() { + return List.>of( + q -> q.clear(), + q -> assertTrue(q.removeIf(e -> true)), + q -> assertTrue(q.retainAll(List.of()))) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "bulkRemovalActions") + public void bulkRemovalOperationsCollapseNodes( + Consumer bulkRemovalAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + bulkRemovalAction.accept(q); + assertEquals(nodeCount(q), 1); + assertInvariants(q); + } + + /** + * Actions that remove the first element, and are expected to + * leave at most one slack dead node at head. + */ + @DataProvider + public Object[][] pollActions() { + return List.>of( + q -> assertNotNull(q.poll()), + q -> assertNotNull(q.remove())) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "pollActions") + public void pollActionsOneNodeSlack( + Consumer pollAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + assertEquals(nodeCount(q), n + 1); + for (int i = 0; i < n; i++) { + int c = nodeCount(q); + boolean slack = item(head(q)) == null; + if (slack) assertNotNull(item(next(head(q)))); + pollAction.accept(q); + assertEquals(nodeCount(q), q.isEmpty() ? 1 : c - (slack ? 2 : 0)); + } + assertInvariants(q); + } + + /** + * Actions that append an element, and are expected to + * leave at most one slack node at tail. + */ + @DataProvider + public Object[][] addActions() { + return List.>of( + q -> q.add(1), + q -> q.offer(1)) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "addActions") + public void addActionsOneNodeSlack( + Consumer addAction) { + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) { + boolean slack = next(tail(q)) != null; + addAction.accept(q); + if (slack) + assertNull(next(tail(q))); + else { + assertNotNull(next(tail(q))); + assertNull(next(next(tail(q)))); + } + assertInvariants(q); + } + } + + byte[] serialBytes(Object o) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(o); + oos.flush(); + oos.close(); + return bos.toByteArray(); + } catch (Exception fail) { + throw new AssertionError(fail); + } + } + + @SuppressWarnings("unchecked") + T serialClone(T o) { + try { + ObjectInputStream ois = new ObjectInputStream + (new ByteArrayInputStream(serialBytes(o))); + T clone = (T) ois.readObject(); + assertNotSame(o, clone); + assertSame(o.getClass(), clone.getClass()); + return clone; + } catch (Exception fail) { + throw new AssertionError(fail); + } + } + + public void testSerialization() { + ConcurrentLinkedQueue q = serialClone(new ConcurrentLinkedQueue()); + assertInvariants(q); + } + + /** Checks conditions which should always be true. */ + void assertInvariants(ConcurrentLinkedQueue q) { + assertNotNull(head(q)); + assertNotNull(tail(q)); + // head is never self-linked (but tail may!) + for (Object h; next(h = head(q)) == h; ) + assertNotSame(h, head(q)); // must be update race + } +} diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/test/java/util/concurrent/LinkedTransferQueue/WhiteBox.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/LinkedTransferQueue/WhiteBox.java Fri Feb 03 13:24:59 2017 -0800 @@ -0,0 +1,408 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Martin Buchholz with assistance from members of JCP + * JSR-166 Expert Group and released to the public domain, as + * explained at http://creativecommons.org/publicdomain/zero/1.0/ + */ + +/* + * @test + * @modules java.base/java.util.concurrent:open + * @run testng WhiteBox + * @summary White box tests of implementation details + */ + +import static org.testng.Assert.*; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import static java.util.stream.Collectors.toList; +import java.util.function.Consumer; +import java.util.function.Function; + +@Test +public class WhiteBox { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final VarHandle HEAD, TAIL, ITEM, NEXT; + final int SWEEP_THRESHOLD; + + public WhiteBox() throws ReflectiveOperationException { + Class qClass = LinkedTransferQueue.class; + Class nodeClass = Class.forName(qClass.getName() + "$Node"); + MethodHandles.Lookup lookup + = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup()); + HEAD = lookup.findVarHandle(qClass, "head", nodeClass); + TAIL = lookup.findVarHandle(qClass, "tail", nodeClass); + NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass); + ITEM = lookup.findVarHandle(nodeClass, "item", Object.class); + SWEEP_THRESHOLD = (int) + lookup.findStaticVarHandle(qClass, "SWEEP_THRESHOLD", int.class) + .get(); + } + + Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); } + Object tail(LinkedTransferQueue q) { return TAIL.getVolatile(q); } + Object item(Object node) { return ITEM.getVolatile(node); } + Object next(Object node) { return NEXT.getVolatile(node); } + + int nodeCount(LinkedTransferQueue q) { + int i = 0; + for (Object p = head(q); p != null; ) { + i++; + if (p == (p = next(p))) p = head(q); + } + return i; + } + + int tailCount(LinkedTransferQueue q) { + int i = 0; + for (Object p = tail(q); p != null; ) { + i++; + if (p == (p = next(p))) p = head(q); + } + return i; + } + + Object findNode(LinkedTransferQueue q, Object e) { + for (Object p = head(q); p != null; ) { + if (item(p) != null && e.equals(item(p))) + return p; + if (p == (p = next(p))) p = head(q); + } + throw new AssertionError("not found"); + } + + Iterator iteratorAt(LinkedTransferQueue q, Object e) { + for (Iterator it = q.iterator(); it.hasNext(); ) + if (it.next().equals(e)) + return it; + throw new AssertionError("not found"); + } + + void assertIsSelfLinked(Object node) { + assertSame(next(node), node); + assertNull(item(node)); + } + void assertIsNotSelfLinked(Object node) { + assertNotSame(node, next(node)); + } + + @Test + public void addRemove() { + LinkedTransferQueue q = new LinkedTransferQueue(); + assertInvariants(q); + assertNull(next(head(q))); + assertNull(item(head(q))); + q.add(1); + assertEquals(nodeCount(q), 2); + assertInvariants(q); + q.remove(1); + assertEquals(nodeCount(q), 1); + assertInvariants(q); + } + + /** + * Traversal actions that visit every node and do nothing, but + * have side effect of squeezing out dead nodes. + */ + @DataProvider + public Object[][] traversalActions() { + return List.>of( + q -> q.forEach(e -> {}), + q -> assertFalse(q.contains(new Object())), + q -> assertFalse(q.remove(new Object())), + q -> q.spliterator().forEachRemaining(e -> {}), + q -> q.stream().collect(toList()), + q -> assertFalse(q.removeIf(e -> false)), + q -> assertFalse(q.removeAll(List.of()))) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseLeadingNodes( + Consumer traversalAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + Object oldHead; + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + assertEquals(nodeCount(q), n + 1); + oldHead = head(q); + traversalAction.accept(q); + assertInvariants(q); + assertEquals(nodeCount(q), n); + assertIsSelfLinked(oldHead); + } + + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseInteriorNodes( + Consumer traversalAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + int n = 6; + for (int i = 0; i < n; i++) q.add(i); + + // We must be quite devious to reliably create an interior dead node + Object p0 = findNode(q, 0); + Object p1 = findNode(q, 1); + Object p2 = findNode(q, 2); + Object p3 = findNode(q, 3); + Object p4 = findNode(q, 4); + Object p5 = findNode(q, 5); + + Iterator it1 = iteratorAt(q, 1); + Iterator it2 = iteratorAt(q, 2); + + it2.remove(); // causes it2's ancestor to advance to 1 + assertSame(next(p1), p3); + assertSame(next(p2), p3); + assertNull(item(p2)); + it1.remove(); // removes it2's ancestor + assertSame(next(p0), p3); + assertSame(next(p1), p3); + assertSame(next(p2), p3); + assertNull(item(p1)); + assertEquals(it2.next(), 3); + it2.remove(); // it2's ancestor can't unlink + + assertSame(next(p0), p3); // p3 is now interior dead node + assertSame(next(p1), p4); // it2 uselessly CASed p1.next + assertSame(next(p2), p3); + assertSame(next(p3), p4); + assertInvariants(q); + + int c = nodeCount(q); + traversalAction.accept(q); + assertEquals(nodeCount(q), c - 1); + + assertSame(next(p0), p4); + assertSame(next(p1), p4); + assertSame(next(p2), p3); + assertSame(next(p3), p4); + assertInvariants(q); + + // trailing nodes are not unlinked + Iterator it5 = iteratorAt(q, 5); it5.remove(); + traversalAction.accept(q); + assertSame(next(p4), p5); + assertNull(next(p5)); + assertEquals(nodeCount(q), c - 1); + } + + /** + * Checks that traversal operations collapse a random pattern of + * dead nodes as could normally only occur with a race. + */ + @Test(dataProvider = "traversalActions") + public void traversalOperationsCollapseRandomNodes( + Consumer traversalAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + int n = rnd.nextInt(6); + for (int i = 0; i < n; i++) q.add(i); + ArrayList nulledOut = new ArrayList(); + for (Object p = head(q); p != null; p = next(p)) + if (rnd.nextBoolean()) { + nulledOut.add(item(p)); + ITEM.setVolatile(p, null); + } + traversalAction.accept(q); + int c = nodeCount(q); + assertEquals(q.size(), c - (q.contains(n - 1) ? 0 : 1)); + for (int i = 0; i < n; i++) + assertTrue(nulledOut.contains(i) ^ q.contains(i)); + } + + /** + * Traversal actions that remove every element, and are also + * expected to squeeze out dead nodes. + */ + @DataProvider + public Object[][] bulkRemovalActions() { + return List.>of( + q -> q.clear(), + q -> assertTrue(q.removeIf(e -> true)), + q -> assertTrue(q.retainAll(List.of()))) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "bulkRemovalActions") + public void bulkRemovalOperationsCollapseNodes( + Consumer bulkRemovalAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + bulkRemovalAction.accept(q); + assertEquals(nodeCount(q), 1); + assertInvariants(q); + } + + /** + * Actions that remove the first element, and are expected to + * leave at most one slack dead node at head. + */ + @DataProvider + public Object[][] pollActions() { + return List.>of( + q -> assertNotNull(q.poll()), + q -> { try { assertNotNull(q.poll(1L, TimeUnit.DAYS)); } + catch (Throwable x) { throw new AssertionError(x); }}, + q -> { try { assertNotNull(q.take()); } + catch (Throwable x) { throw new AssertionError(x); }}, + q -> assertNotNull(q.remove())) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "pollActions") + public void pollActionsOneNodeSlack( + Consumer pollAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + int n = 1 + rnd.nextInt(5); + for (int i = 0; i < n; i++) q.add(i); + assertEquals(nodeCount(q), n + 1); + for (int i = 0; i < n; i++) { + int c = nodeCount(q); + boolean slack = item(head(q)) == null; + if (slack) assertNotNull(item(next(head(q)))); + pollAction.accept(q); + assertEquals(nodeCount(q), q.isEmpty() ? 1 : c - (slack ? 2 : 0)); + } + assertInvariants(q); + } + + /** + * Actions that append an element, and are expected to + * leave at most one slack node at tail. + */ + @DataProvider + public Object[][] addActions() { + return List.>of( + q -> q.add(1), + q -> q.offer(1)) + .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); + } + + @Test(dataProvider = "addActions") + public void addActionsOneNodeSlack( + Consumer addAction) { + LinkedTransferQueue q = new LinkedTransferQueue(); + int n = 1 + rnd.nextInt(9); + for (int i = 0; i < n; i++) { + boolean slack = next(tail(q)) != null; + addAction.accept(q); + if (slack) + assertNull(next(tail(q))); + else { + assertNotNull(next(tail(q))); + assertNull(next(next(tail(q)))); + } + assertInvariants(q); + } + } + + byte[] serialBytes(Object o) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(o); + oos.flush(); + oos.close(); + return bos.toByteArray(); + } catch (Exception fail) { + throw new AssertionError(fail); + } + } + + @SuppressWarnings("unchecked") + T serialClone(T o) { + try { + ObjectInputStream ois = new ObjectInputStream + (new ByteArrayInputStream(serialBytes(o))); + T clone = (T) ois.readObject(); + assertNotSame(o, clone); + assertSame(o.getClass(), clone.getClass()); + return clone; + } catch (Exception fail) { + throw new AssertionError(fail); + } + } + + public void testSerialization() { + LinkedTransferQueue q = serialClone(new LinkedTransferQueue()); + assertInvariants(q); + } + + public void cancelledNodeSweeping() throws Throwable { + assertEquals(SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1), 0); + LinkedTransferQueue q = new LinkedTransferQueue(); + Thread blockHead = null; + if (rnd.nextBoolean()) { + blockHead = new Thread( + () -> { try { q.take(); } catch (InterruptedException ok) {}}); + blockHead.start(); + while (nodeCount(q) != 2) { Thread.yield(); } + assertTrue(q.hasWaitingConsumer()); + assertEquals(q.getWaitingConsumerCount(), 1); + } + int initialNodeCount = nodeCount(q); + + // Some dead nodes do in fact accumulate ... + if (blockHead != null) + while (nodeCount(q) < initialNodeCount + SWEEP_THRESHOLD / 2) + q.poll(1L, TimeUnit.MICROSECONDS); + + // ... but no more than SWEEP_THRESHOLD nodes accumulate + for (int i = rnd.nextInt(SWEEP_THRESHOLD * 10); i-->0; ) + q.poll(1L, TimeUnit.MICROSECONDS); + assertTrue(nodeCount(q) <= initialNodeCount + SWEEP_THRESHOLD); + + if (blockHead != null) { + blockHead.interrupt(); + blockHead.join(); + } + } + + /** Checks conditions which should always be true. */ + void assertInvariants(LinkedTransferQueue q) { + assertNotNull(head(q)); + assertNotNull(tail(q)); + // head is never self-linked (but tail may!) + for (Object h; next(h = head(q)) == h; ) + assertNotSame(h, head(q)); // must be update race + } +} diff -r 65d31ea930aa -r 60e247b8d9a4 jdk/test/java/util/concurrent/tck/Collection8Test.java --- a/jdk/test/java/util/concurrent/tck/Collection8Test.java Fri Feb 03 13:24:59 2017 -0800 +++ b/jdk/test/java/util/concurrent/tck/Collection8Test.java Fri Feb 03 13:24:59 2017 -0800 @@ -754,6 +754,31 @@ } /** + * Concurrent Spliterators, once exhausted, stay exhausted. + */ + public void testStickySpliteratorExhaustion() throws Throwable { + if (!impl.isConcurrent()) return; + if (!testImplementationDetails) return; + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final Consumer alwaysThrows = e -> { throw new AssertionError(); }; + final Collection c = impl.emptyCollection(); + final Spliterator s = c.spliterator(); + if (rnd.nextBoolean()) { + assertFalse(s.tryAdvance(alwaysThrows)); + } else { + s.forEachRemaining(alwaysThrows); + } + final Object one = impl.makeElement(1); + // Spliterator should not notice added element + c.add(one); + if (rnd.nextBoolean()) { + assertFalse(s.tryAdvance(alwaysThrows)); + } else { + s.forEachRemaining(alwaysThrows); + } + } + + /** * Motley crew of threads concurrently randomly hammer the collection. */ public void testDetectRaces() throws Throwable {