--- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java Wed Dec 21 11:54:42 2016 -0800
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java Wed Dec 21 14:22:53 2016 -0800
@@ -39,6 +39,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.locks.Condition;
@@ -740,8 +741,7 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
+ Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
@@ -986,6 +986,16 @@
}
/**
+ * Used for any element traversal that is not entirely under lock.
+ * Such traversals must handle both:
+ * - dequeued nodes (p.next == p)
+ * - (possibly multiple) interior removed nodes (p.item == null)
+ */
+ Node<E> succ(Node<E> p) {
+ return (p == (p = p.next)) ? first : p;
+ }
+
+ /**
* Returns an iterator over the elements in this deque in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
@@ -1024,8 +1034,8 @@
/**
* nextItem holds on to item fields because once we claim that
* an element exists in hasNext(), we must return item read
- * under lock (in advance()) even if it was in the process of
- * being removed when hasNext() was called.
+ * under lock even if it was in the process of being removed
+ * when hasNext() was called.
*/
E nextItem;
@@ -1038,48 +1048,17 @@
abstract Node<E> firstNode();
abstract Node<E> nextNode(Node<E> n);
+ private Node<E> succ(Node<E> p) {
+ return (p == (p = nextNode(p))) ? firstNode() : p;
+ }
+
AbstractItr() {
// set to initial position
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
- next = firstNode();
- nextItem = (next == null) ? null : next.item;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the successor node of the given non-null, but
- * possibly previously deleted, node.
- */
- private Node<E> succ(Node<E> n) {
- // Chains of deleted nodes ending in null or self-links
- // are possible if multiple interior nodes are removed.
- for (;;) {
- Node<E> s = nextNode(n);
- if (s == null)
- return null;
- else if (s.item != null)
- return s;
- else if (s == n)
- return firstNode();
- else
- n = s;
- }
- }
-
- /**
- * Advances next.
- */
- void advance() {
- final ReentrantLock lock = LinkedBlockingDeque.this.lock;
- lock.lock();
- try {
- // assert next != null;
- next = succ(next);
- nextItem = (next == null) ? null : next.item;
+ if ((next = firstNode()) != null)
+ nextItem = next.item;
} finally {
lock.unlock();
}
@@ -1090,14 +1069,65 @@
}
public E next() {
- if (next == null)
+ Node<E> p;
+ if ((p = next) == null)
throw new NoSuchElementException();
- lastRet = next;
+ lastRet = p;
E x = nextItem;
- advance();
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ E e = null;
+ for (p = nextNode(p); p != null && (e = p.item) == null; )
+ p = succ(p);
+ next = p;
+ nextItem = e;
+ } finally {
+ lock.unlock();
+ }
return x;
}
+ public void forEachRemaining(Consumer<? super E> action) {
+ // A variant of forEachFrom
+ Objects.requireNonNull(action);
+ Node<E> p;
+ if ((p = next) == null) return;
+ lastRet = p;
+ next = null;
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ final int batchSize = 32;
+ Object[] es = null;
+ int n, len = 1;
+ do {
+ lock.lock();
+ try {
+ if (es == null) {
+ p = nextNode(p);
+ for (Node<E> q = p; q != null; q = succ(q))
+ if (q.item != null && ++len == batchSize)
+ break;
+ es = new Object[len];
+ es[0] = nextItem;
+ nextItem = null;
+ n = 1;
+ } else
+ n = 0;
+ for (; p != null && n < len; p = succ(p))
+ if ((es[n] = p.item) != null) {
+ lastRet = p;
+ n++;
+ }
+ } finally {
+ lock.unlock();
+ }
+ for (int i = 0; i < n; i++) {
+ @SuppressWarnings("unchecked") E e = (E) es[i];
+ action.accept(e);
+ }
+ } while (n > 0 && p != null);
+ }
+
public void remove() {
Node<E> n = lastRet;
if (n == null)
@@ -1116,25 +1146,30 @@
/** Forward iterator */
private class Itr extends AbstractItr {
+ Itr() {} // prevent access constructor creation
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}
/** Descending iterator */
private class DescendingItr extends AbstractItr {
+ DescendingItr() {} // prevent access constructor creation
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}
- /** A customized variant of Spliterators.IteratorSpliterator */
+ /**
+ * A customized variant of Spliterators.IteratorSpliterator.
+ * Keep this class in sync with (very similar) LBQSpliterator.
+ */
private final class LBDSpliterator implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
- long est; // size estimate
+ long est = size(); // size estimate
- LBDSpliterator() { est = size(); }
+ LBDSpliterator() {}
public long estimateSize() { return est; }
@@ -1143,8 +1178,7 @@
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
- (((h = current) != null && h != h.next)
- || (h = first) != null)
+ ((h = current) != null || (h = first) != null)
&& h.next != null) {
Object[] a = new Object[n];
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
@@ -1152,10 +1186,10 @@
Node<E> p = current;
lock.lock();
try {
- if (((p != null && p != p.next) || (p = first) != null)
- && p.item != null)
- for (; p != null && i < n; p = p.next)
- a[i++] = p.item;
+ if (p != null || (p = first) != null)
+ for (; p != null && i < n; p = succ(p))
+ if ((a[i] = p.item) != null)
+ i++;
} finally {
lock.unlock();
}
@@ -1176,51 +1210,39 @@
return null;
}
- public void forEachRemaining(Consumer<? super E> action) {
- if (action == null) throw new NullPointerException();
- if (exhausted)
- return;
- exhausted = true;
- final ReentrantLock lock = LinkedBlockingDeque.this.lock;
- Node<E> p = current;
- current = null;
- do {
+ public boolean tryAdvance(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ if (!exhausted) {
E e = null;
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
- if ((p != null && p != p.next) || (p = first) != null) {
- e = p.item;
- p = p.next;
- }
+ Node<E> p;
+ if ((p = current) != null || (p = first) != null)
+ do {
+ e = p.item;
+ p = succ(p);
+ } while (e == null && p != null);
+ exhausted = ((current = p) == null);
} finally {
lock.unlock();
}
- if (e != null)
+ if (e != null) {
action.accept(e);
- } while (p != null);
+ return true;
+ }
+ }
+ return false;
}
- public boolean tryAdvance(Consumer<? super E> action) {
- if (action == null) throw new NullPointerException();
- if (exhausted)
- return false;
- final ReentrantLock lock = LinkedBlockingDeque.this.lock;
- Node<E> p = current;
- E e = null;
- lock.lock();
- try {
- if ((p != null && p != p.next) || (p = first) != null) {
- e = p.item;
- p = p.next;
- }
- } finally {
- lock.unlock();
+ public void forEachRemaining(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ if (!exhausted) {
+ exhausted = true;
+ Node<E> p = current;
+ current = null;
+ forEachFrom(action, p);
}
- exhausted = ((current = p) == null);
- if (e == null)
- return false;
- action.accept(e);
- return true;
}
public int characteristics() {
@@ -1251,6 +1273,48 @@
}
/**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void forEach(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ forEachFrom(action, null);
+ }
+
+ /**
+ * Runs action on each element found during a traversal starting at p.
+ * If p is null, traversal starts at head.
+ */
+ void forEachFrom(Consumer<? super E> action, Node<E> p) {
+ // Extract batches of elements while holding the lock; then
+ // run the action on the elements while not
+ final ReentrantLock lock = this.lock;
+ final int batchSize = 32; // max number of elements per batch
+ Object[] es = null; // container for batch of elements
+ int n, len = 0;
+ do {
+ lock.lock();
+ try {
+ if (es == null) {
+ if (p == null) p = first;
+ for (Node<E> q = p; q != null; q = succ(q))
+ if (q.item != null && ++len == batchSize)
+ break;
+ es = new Object[len];
+ }
+ for (n = 0; p != null && n < len; p = succ(p))
+ if ((es[n] = p.item) != null)
+ n++;
+ } finally {
+ lock.unlock();
+ }
+ for (int i = 0; i < n; i++) {
+ @SuppressWarnings("unchecked") E e = (E) es[i];
+ action.accept(e);
+ }
+ } while (n > 0 && p != null);
+ }
+
+ /**
* Saves this deque to a stream (that is, serializes it).
*
* @param s the stream
@@ -1290,8 +1354,7 @@
last = null;
// Read in all elements and place in queue
for (;;) {
- @SuppressWarnings("unchecked")
- E item = (E)s.readObject();
+ @SuppressWarnings("unchecked") E item = (E)s.readObject();
if (item == null)
break;
add(item);