--- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java Wed Dec 21 11:54:42 2016 -0800
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java Wed Dec 21 14:22:53 2016 -0800
@@ -39,6 +39,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicInteger;
@@ -234,14 +235,6 @@
putLock.unlock();
}
-// /**
-// * Tells whether both locks are held by current thread.
-// */
-// boolean isFullyLocked() {
-// return (putLock.isHeldByCurrentThread() &&
-// takeLock.isHeldByCurrentThread());
-// }
-
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
@@ -517,7 +510,8 @@
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
- // assert isFullyLocked();
+ // assert putLock.isHeldByCurrentThread();
+ // assert takeLock.isHeldByCurrentThread();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
@@ -701,8 +695,7 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
+ Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
@@ -741,6 +734,16 @@
}
/**
+ * Used for any element traversal that is not entirely under lock.
+ * Such traversals must handle both:
+ * - dequeued nodes (p.next == p)
+ * - (possibly multiple) interior removed nodes (p.item == null)
+ */
+ Node<E> succ(Node<E> p) {
+ return (p == (p = p.next)) ? head.next : p;
+ }
+
+ /**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
@@ -760,48 +763,80 @@
* still have it to return even if lost race with a take etc.
*/
- private Node<E> current;
+ private Node<E> next;
+ private E nextItem;
private Node<E> lastRet;
- private E currentElement;
Itr() {
fullyLock();
try {
- current = head.next;
- if (current != null)
- currentElement = current.item;
+ if ((next = head.next) != null)
+ nextItem = next.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
- return current != null;
+ return next != null;
}
public E next() {
+ Node<E> p;
+ if ((p = next) == null)
+ throw new NoSuchElementException();
+ lastRet = p;
+ E x = nextItem;
fullyLock();
try {
- if (current == null)
- throw new NoSuchElementException();
- lastRet = current;
- E item = null;
- // Unlike other traversal methods, iterators must handle both:
- // - dequeued nodes (p.next == p)
- // - (possibly multiple) interior removed nodes (p.item == null)
- for (Node<E> p = current, q;; p = q) {
- if ((q = p.next) == p)
- q = head.next;
- if (q == null || (item = q.item) != null) {
- current = q;
- E x = currentElement;
- currentElement = item;
- return x;
- }
- }
+ E e = null;
+ for (p = p.next; p != null && (e = p.item) == null; )
+ p = succ(p);
+ next = p;
+ nextItem = e;
} finally {
fullyUnlock();
}
+ return x;
+ }
+
+ public void forEachRemaining(Consumer<? super E> action) {
+ // A variant of forEachFrom
+ Objects.requireNonNull(action);
+ Node<E> p;
+ if ((p = next) == null) return;
+ lastRet = p;
+ next = null;
+ final int batchSize = 32;
+ Object[] es = null;
+ int n, len = 1;
+ do {
+ fullyLock();
+ try {
+ if (es == null) {
+ p = p.next;
+ for (Node<E> q = p; q != null; q = succ(q))
+ if (q.item != null && ++len == batchSize)
+ break;
+ es = new Object[len];
+ es[0] = nextItem;
+ nextItem = null;
+ n = 1;
+ } else
+ n = 0;
+ for (; p != null && n < len; p = succ(p))
+ if ((es[n] = p.item) != null) {
+ lastRet = p;
+ n++;
+ }
+ } finally {
+ fullyUnlock();
+ }
+ for (int i = 0; i < n; i++) {
+ @SuppressWarnings("unchecked") E e = (E) es[i];
+ action.accept(e);
+ }
+ } while (n > 0 && p != null);
}
public void remove() {
@@ -825,42 +860,39 @@
}
}
- /** A customized variant of Spliterators.IteratorSpliterator */
- static final class LBQSpliterator<E> implements Spliterator<E> {
+ /**
+ * A customized variant of Spliterators.IteratorSpliterator.
+ * Keep this class in sync with (very similar) LBDSpliterator.
+ */
+ private final class LBQSpliterator implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
- final LinkedBlockingQueue<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
- long est; // size estimate
- LBQSpliterator(LinkedBlockingQueue<E> queue) {
- this.queue = queue;
- this.est = queue.size();
- }
+ long est = size(); // size estimate
+
+ LBQSpliterator() {}
public long estimateSize() { return est; }
public Spliterator<E> trySplit() {
Node<E> h;
- final LinkedBlockingQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
- ((h = current) != null || (h = q.head.next) != null) &&
- h.next != null) {
+ ((h = current) != null || (h = head.next) != null)
+ && h.next != null) {
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
- q.fullyLock();
+ fullyLock();
try {
- if (p != null || (p = q.head.next) != null) {
- do {
+ if (p != null || (p = head.next) != null)
+ for (; p != null && i < n; p = succ(p))
if ((a[i] = p.item) != null)
- ++i;
- } while ((p = p.next) != null && i < n);
- }
+ i++;
} finally {
- q.fullyUnlock();
+ fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
@@ -879,53 +911,22 @@
return null;
}
- public void forEachRemaining(Consumer<? super E> action) {
- if (action == null) throw new NullPointerException();
- final LinkedBlockingQueue<E> q = this.queue;
- if (!exhausted) {
- exhausted = true;
- Node<E> p = current;
- do {
- E e = null;
- q.fullyLock();
- try {
- if (p == null)
- p = q.head.next;
- while (p != null) {
- e = p.item;
- p = p.next;
- if (e != null)
- break;
- }
- } finally {
- q.fullyUnlock();
- }
- if (e != null)
- action.accept(e);
- } while (p != null);
- }
- }
-
public boolean tryAdvance(Consumer<? super E> action) {
- if (action == null) throw new NullPointerException();
- final LinkedBlockingQueue<E> q = this.queue;
+ Objects.requireNonNull(action);
if (!exhausted) {
E e = null;
- q.fullyLock();
+ fullyLock();
try {
- if (current == null)
- current = q.head.next;
- while (current != null) {
- e = current.item;
- current = current.next;
- if (e != null)
- break;
- }
+ Node<E> p;
+ if ((p = current) != null || (p = head.next) != null)
+ do {
+ e = p.item;
+ p = succ(p);
+ } while (e == null && p != null);
+ exhausted = ((current = p) == null);
} finally {
- q.fullyUnlock();
+ fullyUnlock();
}
- if (current == null)
- exhausted = true;
if (e != null) {
action.accept(e);
return true;
@@ -934,9 +935,20 @@
return false;
}
+ public void forEachRemaining(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ if (!exhausted) {
+ exhausted = true;
+ Node<E> p = current;
+ current = null;
+ forEachFrom(action, p);
+ }
+ }
+
public int characteristics() {
- return Spliterator.ORDERED | Spliterator.NONNULL |
- Spliterator.CONCURRENT;
+ return (Spliterator.ORDERED |
+ Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
}
}
@@ -957,7 +969,48 @@
* @since 1.8
*/
public Spliterator<E> spliterator() {
- return new LBQSpliterator<E>(this);
+ return new LBQSpliterator();
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void forEach(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ forEachFrom(action, null);
+ }
+
+ /**
+ * Runs action on each element found during a traversal starting at p.
+ * If p is null, traversal starts at head.
+ */
+ void forEachFrom(Consumer<? super E> action, Node<E> p) {
+ // Extract batches of elements while holding the lock; then
+ // run the action on the elements while not
+ final int batchSize = 32; // max number of elements per batch
+ Object[] es = null; // container for batch of elements
+ int n, len = 0;
+ do {
+ fullyLock();
+ try {
+ if (es == null) {
+ if (p == null) p = head.next;
+ for (Node<E> q = p; q != null; q = succ(q))
+ if (q.item != null && ++len == batchSize)
+ break;
+ es = new Object[len];
+ }
+ for (n = 0; p != null && n < len; p = succ(p))
+ if ((es[n] = p.item) != null)
+ n++;
+ } finally {
+ fullyUnlock();
+ }
+ for (int i = 0; i < n; i++) {
+ @SuppressWarnings("unchecked") E e = (E) es[i];
+ action.accept(e);
+ }
+ } while (n > 0 && p != null);
}
/**