--- a/jdk/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java Mon Nov 28 23:33:25 2016 -0800
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java Mon Nov 28 23:36:11 2016 -0800
@@ -46,6 +46,8 @@
import java.util.Spliterators;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
/**
* A bounded {@linkplain BlockingQueue blocking queue} backed by an
@@ -85,6 +87,11 @@
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
+ /*
+ * Much of the implementation mechanics, especially the unusual
+ * nested loops, are shared and co-maintained with ArrayDeque.
+ */
+
/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
@@ -129,10 +136,21 @@
// Internal helper methods
/**
- * Circularly decrements array index i.
+ * Increments i, mod modulus.
+ * Precondition and postcondition: 0 <= i < modulus.
*/
- final int dec(int i) {
- return ((i == 0) ? items.length : i) - 1;
+ static final int inc(int i, int modulus) {
+ if (++i >= modulus) i = 0;
+ return i;
+ }
+
+ /**
+ * Decrements i, mod modulus.
+ * Precondition and postcondition: 0 <= i < modulus.
+ */
+ static final int dec(int i, int modulus) {
+ if (--i < 0) i = modulus - 1;
+ return i;
}
/**
@@ -144,14 +162,24 @@
}
/**
+ * Returns element at array index i.
+ * This is a slight abuse of generics, accepted by javac.
+ */
+ @SuppressWarnings("unchecked")
+ static <E> E itemAt(Object[] items, int i) {
+ return (E) items[i];
+ }
+
+ /**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
- private void enqueue(E x) {
+ private void enqueue(E e) {
+ // assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
- items[putIndex] = x;
+ items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
@@ -162,18 +190,19 @@
* Call only when holding lock.
*/
private E dequeue() {
+ // assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
- E x = (E) items[takeIndex];
+ E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
- return x;
+ return e;
}
/**
@@ -182,6 +211,7 @@
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
+ // assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
@@ -267,6 +297,7 @@
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
+ final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
@@ -481,15 +512,16 @@
try {
if (count > 0) {
final Object[] items = this.items;
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- if (o.equals(items[i])) {
- removeAt(i);
- return true;
- }
- if (++i == items.length) i = 0;
- } while (i != putIndex);
+ for (int i = takeIndex, end = putIndex,
+ to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ for (; i < to; i++)
+ if (o.equals(items[i])) {
+ removeAt(i);
+ return true;
+ }
+ if (to == end) break;
+ }
}
return false;
} finally {
@@ -512,13 +544,14 @@
try {
if (count > 0) {
final Object[] items = this.items;
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- if (o.equals(items[i]))
- return true;
- if (++i == items.length) i = 0;
- } while (i != putIndex);
+ for (int i = takeIndex, end = putIndex,
+ to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ for (; i < to; i++)
+ if (o.equals(items[i]))
+ return true;
+ if (to == end) break;
+ }
}
return false;
} finally {
@@ -625,15 +658,9 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int k = count;
- if (k > 0) {
- final Object[] items = this.items;
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- items[i] = null;
- if (++i == items.length) i = 0;
- } while (i != putIndex);
+ int k;
+ if ((k = count) > 0) {
+ circularClear(items, takeIndex, putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
@@ -647,6 +674,18 @@
}
/**
+ * Nulls out slots starting at array index i, upto index end.
+ * If i == end, the entire array is cleared!
+ */
+ private static void circularClear(Object[] items, int i, int end) {
+ for (int to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ Arrays.fill(items, i, to, null);
+ if (to == end) break;
+ }
+ }
+
+ /**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
@@ -678,8 +717,8 @@
try {
while (i < n) {
@SuppressWarnings("unchecked")
- E x = (E) items[take];
- c.add(x);
+ E e = (E) items[take];
+ c.add(e);
items[take] = null;
if (++take == items.length) take = 0;
i++;
@@ -808,7 +847,7 @@
* there is known to be at least one iterator to collect
*/
void doSomeSweeping(boolean tryHarder) {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
// assert head != null;
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
@@ -864,7 +903,7 @@
* Adds a new iterator to the linked list of tracked iterators.
*/
void register(Itr itr) {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
head = new Node(itr, head);
}
@@ -874,7 +913,7 @@
* Notifies all iterators, and expunges any that are now stale.
*/
void takeIndexWrapped() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
@@ -931,7 +970,7 @@
* clears all weak refs, and unlinks the itrs datastructure.
*/
void queueIsEmpty() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
@@ -947,7 +986,7 @@
* Called whenever an element has been dequeued (at takeIndex).
*/
void elementDequeued() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
@@ -1008,7 +1047,6 @@
private static final int DETACHED = -3;
Itr() {
- // assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
@@ -1041,12 +1079,12 @@
}
boolean isDetached() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
return prevTakeIndex < 0;
}
private int incCursor(int index) {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
if (++index == items.length) index = 0;
if (index == putIndex) index = NONE;
return index;
@@ -1071,7 +1109,7 @@
* operation on this iterator. Call only from iterating thread.
*/
private void incorporateDequeues() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
// assert itrs != null;
// assert !isDetached();
// assert count > 0;
@@ -1114,7 +1152,7 @@
*/
private void detach() {
// Switch to detached mode
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
// assert cursor == NONE;
// assert nextIndex < 0;
// assert lastRet < 0 || nextItem == null;
@@ -1134,7 +1172,6 @@
* triggered by queue modifications.
*/
public boolean hasNext() {
- // assert lock.getHoldCount() == 0;
if (nextItem != null)
return true;
noNext();
@@ -1164,9 +1201,8 @@
}
public E next() {
- // assert lock.getHoldCount() == 0;
- final E x = nextItem;
- if (x == null)
+ final E e = nextItem;
+ if (e == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
@@ -1188,13 +1224,43 @@
} finally {
lock.unlock();
}
- return x;
+ return e;
+ }
+
+ public void forEachRemaining(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ final E e = nextItem;
+ if (e == null) return;
+ if (!isDetached())
+ incorporateDequeues();
+ action.accept(e);
+ if (isDetached() || cursor < 0) return;
+ final Object[] items = ArrayBlockingQueue.this.items;
+ for (int i = cursor, end = putIndex,
+ to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ for (; i < to; i++)
+ action.accept(itemAt(items, i));
+ if (to == end) break;
+ }
+ } finally {
+ // Calling forEachRemaining is a strong hint that this
+ // iteration is surely over; supporting remove() after
+ // forEachRemaining() is more trouble than it's worth
+ cursor = nextIndex = lastRet = NONE;
+ nextItem = lastItem = null;
+ detach();
+ lock.unlock();
+ }
}
public void remove() {
- // assert lock.getHoldCount() == 0;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
+ // assert lock.getHoldCount() == 1;
try {
if (!isDetached())
incorporateDequeues(); // might update lastRet or detach
@@ -1232,7 +1298,7 @@
* from next(), as promised by returning true from hasNext().
*/
void shutdown() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
@@ -1260,7 +1326,7 @@
* @return true if this iterator should be unlinked from itrs
*/
boolean removedAt(int removedIndex) {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
if (isDetached())
return true;
@@ -1285,7 +1351,7 @@
}
else if (x > removedDistance) {
// assert cursor != prevTakeIndex;
- this.cursor = cursor = dec(cursor);
+ this.cursor = cursor = dec(cursor, len);
}
}
int lastRet = this.lastRet;
@@ -1294,7 +1360,7 @@
if (x == removedDistance)
this.lastRet = lastRet = REMOVED;
else if (x > removedDistance)
- this.lastRet = lastRet = dec(lastRet);
+ this.lastRet = lastRet = dec(lastRet, len);
}
int nextIndex = this.nextIndex;
if (nextIndex >= 0) {
@@ -1302,7 +1368,7 @@
if (x == removedDistance)
this.nextIndex = nextIndex = REMOVED;
else if (x > removedDistance)
- this.nextIndex = nextIndex = dec(nextIndex);
+ this.nextIndex = nextIndex = dec(nextIndex, len);
}
if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
this.prevTakeIndex = DETACHED;
@@ -1317,7 +1383,7 @@
* @return true if this iterator should be unlinked from itrs
*/
boolean takeIndexWrapped() {
- // assert lock.getHoldCount() == 1;
+ // assert lock.isHeldByCurrentThread();
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {
@@ -1366,4 +1432,170 @@
Spliterator.CONCURRENT));
}
+ public void forEach(Consumer<? super E> action) {
+ Objects.requireNonNull(action);
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ if (count > 0) {
+ final Object[] items = this.items;
+ for (int i = takeIndex, end = putIndex,
+ to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ for (; i < to; i++)
+ action.accept(itemAt(items, i));
+ if (to == end) break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean removeIf(Predicate<? super E> filter) {
+ Objects.requireNonNull(filter);
+ return bulkRemove(filter);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean removeAll(Collection<?> c) {
+ Objects.requireNonNull(c);
+ return bulkRemove(e -> c.contains(e));
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean retainAll(Collection<?> c) {
+ Objects.requireNonNull(c);
+ return bulkRemove(e -> !c.contains(e));
+ }
+
+ /** Implementation of bulk remove methods. */
+ private boolean bulkRemove(Predicate<? super E> filter) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ if (itrs == null) { // check for active iterators
+ if (count > 0) {
+ final Object[] items = this.items;
+ // Optimize for initial run of survivors
+ for (int i = takeIndex, end = putIndex,
+ to = (i < end) ? end : items.length;
+ ; i = 0, to = end) {
+ for (; i < to; i++)
+ if (filter.test(itemAt(items, i)))
+ return bulkRemoveModified(filter, i);
+ if (to == end) break;
+ }
+ }
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ // Active iterators are too hairy!
+ // Punting (for now) to the slow n^2 algorithm ...
+ return super.removeIf(filter);
+ }
+
+ // A tiny bit set implementation
+
+ private static long[] nBits(int n) {
+ return new long[((n - 1) >> 6) + 1];
+ }
+ private static void setBit(long[] bits, int i) {
+ bits[i >> 6] |= 1L << i;
+ }
+ private static boolean isClear(long[] bits, int i) {
+ return (bits[i >> 6] & (1L << i)) == 0;
+ }
+
+ /**
+ * Returns circular distance from i to j, disambiguating i == j to
+ * items.length; never returns 0.
+ */
+ private int distanceNonEmpty(int i, int j) {
+ if ((j -= i) <= 0) j += items.length;
+ return j;
+ }
+
+ /**
+ * Helper for bulkRemove, in case of at least one deletion.
+ * Tolerate predicates that reentrantly access the collection for
+ * read (but not write), so traverse once to find elements to
+ * delete, a second pass to physically expunge.
+ *
+ * @param beg valid index of first element to be deleted
+ */
+ private boolean bulkRemoveModified(
+ Predicate<? super E> filter, final int beg) {
+ final Object[] es = items;
+ final int capacity = items.length;
+ final int end = putIndex;
+ final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
+ deathRow[0] = 1L; // set bit 0
+ for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
+ ; i = 0, to = end, k -= capacity) {
+ for (; i < to; i++)
+ if (filter.test(itemAt(es, i)))
+ setBit(deathRow, i - k);
+ if (to == end) break;
+ }
+ // a two-finger traversal, with hare i reading, tortoise w writing
+ int w = beg;
+ for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
+ ; w = 0) { // w rejoins i on second leg
+ // In this loop, i and w are on the same leg, with i > w
+ for (; i < to; i++)
+ if (isClear(deathRow, i - k))
+ es[w++] = es[i];
+ if (to == end) break;
+ // In this loop, w is on the first leg, i on the second
+ for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
+ if (isClear(deathRow, i - k))
+ es[w++] = es[i];
+ if (i >= to) {
+ if (w == capacity) w = 0; // "corner" case
+ break;
+ }
+ }
+ count -= distanceNonEmpty(w, end);
+ circularClear(es, putIndex = w, end);
+ return true;
+ }
+
+ /** debugging */
+ void checkInvariants() {
+ // meta-assertions
+ // assert lock.isHeldByCurrentThread();
+ try {
+ // Unlike ArrayDeque, we have a count field but no spare slot.
+ // We prefer ArrayDeque's strategy (and the names of its fields!),
+ // but our field layout is baked into the serial form, and so is
+ // too annoying to change.
+ //
+ // putIndex == takeIndex must be disambiguated by checking count.
+ int capacity = items.length;
+ // assert capacity > 0;
+ // assert takeIndex >= 0 && takeIndex < capacity;
+ // assert putIndex >= 0 && putIndex < capacity;
+ // assert count <= capacity;
+ // assert takeIndex == putIndex || items[takeIndex] != null;
+ // assert count == capacity || items[putIndex] == null;
+ // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
+ } catch (Throwable t) {
+ System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
+ takeIndex, putIndex, count, items.length);
+ System.err.printf("items=%s%n",
+ Arrays.toString(items));
+ throw t;
+ }
+ }
+
}