jdk/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java
changeset 42319 0193886267c3
parent 32991 b27c76b82713
child 42927 1d31e540bfcb
--- a/jdk/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java	Mon Nov 28 23:33:25 2016 -0800
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java	Mon Nov 28 23:36:11 2016 -0800
@@ -46,6 +46,8 @@
 import java.util.Spliterators;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
@@ -85,6 +87,11 @@
 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
         implements BlockingQueue<E>, java.io.Serializable {
 
+    /*
+     * Much of the implementation mechanics, especially the unusual
+     * nested loops, are shared and co-maintained with ArrayDeque.
+     */
+
     /**
      * Serialization ID. This class relies on default serialization
      * even for the items array, which is default-serialized, even if
@@ -129,10 +136,21 @@
     // Internal helper methods
 
     /**
-     * Circularly decrements array index i.
+     * Increments i, mod modulus.
+     * Precondition and postcondition: 0 <= i < modulus.
      */
-    final int dec(int i) {
-        return ((i == 0) ? items.length : i) - 1;
+    static final int inc(int i, int modulus) {
+        if (++i >= modulus) i = 0;
+        return i;
+    }
+
+    /**
+     * Decrements i, mod modulus.
+     * Precondition and postcondition: 0 <= i < modulus.
+     */
+    static final int dec(int i, int modulus) {
+        if (--i < 0) i = modulus - 1;
+        return i;
     }
 
     /**
@@ -144,14 +162,24 @@
     }
 
     /**
+     * Returns element at array index i.
+     * This is a slight abuse of generics, accepted by javac.
+     */
+    @SuppressWarnings("unchecked")
+    static <E> E itemAt(Object[] items, int i) {
+        return (E) items[i];
+    }
+
+    /**
      * Inserts element at current put position, advances, and signals.
      * Call only when holding lock.
      */
-    private void enqueue(E x) {
+    private void enqueue(E e) {
+        // assert lock.isHeldByCurrentThread();
         // assert lock.getHoldCount() == 1;
         // assert items[putIndex] == null;
         final Object[] items = this.items;
-        items[putIndex] = x;
+        items[putIndex] = e;
         if (++putIndex == items.length) putIndex = 0;
         count++;
         notEmpty.signal();
@@ -162,18 +190,19 @@
      * Call only when holding lock.
      */
     private E dequeue() {
+        // assert lock.isHeldByCurrentThread();
         // assert lock.getHoldCount() == 1;
         // assert items[takeIndex] != null;
         final Object[] items = this.items;
         @SuppressWarnings("unchecked")
-        E x = (E) items[takeIndex];
+        E e = (E) items[takeIndex];
         items[takeIndex] = null;
         if (++takeIndex == items.length) takeIndex = 0;
         count--;
         if (itrs != null)
             itrs.elementDequeued();
         notFull.signal();
-        return x;
+        return e;
     }
 
     /**
@@ -182,6 +211,7 @@
      * Call only when holding lock.
      */
     void removeAt(final int removeIndex) {
+        // assert lock.isHeldByCurrentThread();
         // assert lock.getHoldCount() == 1;
         // assert items[removeIndex] != null;
         // assert removeIndex >= 0 && removeIndex < items.length;
@@ -267,6 +297,7 @@
         final ReentrantLock lock = this.lock;
         lock.lock(); // Lock only for visibility, not mutual exclusion
         try {
+            final Object[] items = this.items;
             int i = 0;
             try {
                 for (E e : c)
@@ -481,15 +512,16 @@
         try {
             if (count > 0) {
                 final Object[] items = this.items;
-                final int putIndex = this.putIndex;
-                int i = takeIndex;
-                do {
-                    if (o.equals(items[i])) {
-                        removeAt(i);
-                        return true;
-                    }
-                    if (++i == items.length) i = 0;
-                } while (i != putIndex);
+                for (int i = takeIndex, end = putIndex,
+                         to = (i < end) ? end : items.length;
+                     ; i = 0, to = end) {
+                    for (; i < to; i++)
+                        if (o.equals(items[i])) {
+                            removeAt(i);
+                            return true;
+                        }
+                    if (to == end) break;
+                }
             }
             return false;
         } finally {
@@ -512,13 +544,14 @@
         try {
             if (count > 0) {
                 final Object[] items = this.items;
-                final int putIndex = this.putIndex;
-                int i = takeIndex;
-                do {
-                    if (o.equals(items[i]))
-                        return true;
-                    if (++i == items.length) i = 0;
-                } while (i != putIndex);
+                for (int i = takeIndex, end = putIndex,
+                         to = (i < end) ? end : items.length;
+                     ; i = 0, to = end) {
+                    for (; i < to; i++)
+                        if (o.equals(items[i]))
+                            return true;
+                    if (to == end) break;
+                }
             }
             return false;
         } finally {
@@ -625,15 +658,9 @@
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            int k = count;
-            if (k > 0) {
-                final Object[] items = this.items;
-                final int putIndex = this.putIndex;
-                int i = takeIndex;
-                do {
-                    items[i] = null;
-                    if (++i == items.length) i = 0;
-                } while (i != putIndex);
+            int k;
+            if ((k = count) > 0) {
+                circularClear(items, takeIndex, putIndex);
                 takeIndex = putIndex;
                 count = 0;
                 if (itrs != null)
@@ -647,6 +674,18 @@
     }
 
     /**
+     * Nulls out slots starting at array index i, upto index end.
+     * If i == end, the entire array is cleared!
+     */
+    private static void circularClear(Object[] items, int i, int end) {
+        for (int to = (i < end) ? end : items.length;
+             ; i = 0, to = end) {
+            Arrays.fill(items, i, to, null);
+            if (to == end) break;
+        }
+    }
+
+    /**
      * @throws UnsupportedOperationException {@inheritDoc}
      * @throws ClassCastException            {@inheritDoc}
      * @throws NullPointerException          {@inheritDoc}
@@ -678,8 +717,8 @@
             try {
                 while (i < n) {
                     @SuppressWarnings("unchecked")
-                    E x = (E) items[take];
-                    c.add(x);
+                    E e = (E) items[take];
+                    c.add(e);
                     items[take] = null;
                     if (++take == items.length) take = 0;
                     i++;
@@ -808,7 +847,7 @@
          * there is known to be at least one iterator to collect
          */
         void doSomeSweeping(boolean tryHarder) {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             // assert head != null;
             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
             Node o, p;
@@ -864,7 +903,7 @@
          * Adds a new iterator to the linked list of tracked iterators.
          */
         void register(Itr itr) {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             head = new Node(itr, head);
         }
 
@@ -874,7 +913,7 @@
          * Notifies all iterators, and expunges any that are now stale.
          */
         void takeIndexWrapped() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             cycles++;
             for (Node o = null, p = head; p != null;) {
                 final Itr it = p.get();
@@ -931,7 +970,7 @@
          * clears all weak refs, and unlinks the itrs datastructure.
          */
         void queueIsEmpty() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             for (Node p = head; p != null; p = p.next) {
                 Itr it = p.get();
                 if (it != null) {
@@ -947,7 +986,7 @@
          * Called whenever an element has been dequeued (at takeIndex).
          */
         void elementDequeued() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             if (count == 0)
                 queueIsEmpty();
             else if (takeIndex == 0)
@@ -1008,7 +1047,6 @@
         private static final int DETACHED = -3;
 
         Itr() {
-            // assert lock.getHoldCount() == 0;
             lastRet = NONE;
             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
             lock.lock();
@@ -1041,12 +1079,12 @@
         }
 
         boolean isDetached() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             return prevTakeIndex < 0;
         }
 
         private int incCursor(int index) {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             if (++index == items.length) index = 0;
             if (index == putIndex) index = NONE;
             return index;
@@ -1071,7 +1109,7 @@
          * operation on this iterator.  Call only from iterating thread.
          */
         private void incorporateDequeues() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             // assert itrs != null;
             // assert !isDetached();
             // assert count > 0;
@@ -1114,7 +1152,7 @@
          */
         private void detach() {
             // Switch to detached mode
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             // assert cursor == NONE;
             // assert nextIndex < 0;
             // assert lastRet < 0 || nextItem == null;
@@ -1134,7 +1172,6 @@
          * triggered by queue modifications.
          */
         public boolean hasNext() {
-            // assert lock.getHoldCount() == 0;
             if (nextItem != null)
                 return true;
             noNext();
@@ -1164,9 +1201,8 @@
         }
 
         public E next() {
-            // assert lock.getHoldCount() == 0;
-            final E x = nextItem;
-            if (x == null)
+            final E e = nextItem;
+            if (e == null)
                 throw new NoSuchElementException();
             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
             lock.lock();
@@ -1188,13 +1224,43 @@
             } finally {
                 lock.unlock();
             }
-            return x;
+            return e;
+        }
+
+        public void forEachRemaining(Consumer<? super E> action) {
+            Objects.requireNonNull(action);
+            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+            lock.lock();
+            try {
+                final E e = nextItem;
+                if (e == null) return;
+                if (!isDetached())
+                    incorporateDequeues();
+                action.accept(e);
+                if (isDetached() || cursor < 0) return;
+                final Object[] items = ArrayBlockingQueue.this.items;
+                for (int i = cursor, end = putIndex,
+                         to = (i < end) ? end : items.length;
+                     ; i = 0, to = end) {
+                    for (; i < to; i++)
+                        action.accept(itemAt(items, i));
+                    if (to == end) break;
+                }
+            } finally {
+                // Calling forEachRemaining is a strong hint that this
+                // iteration is surely over; supporting remove() after
+                // forEachRemaining() is more trouble than it's worth
+                cursor = nextIndex = lastRet = NONE;
+                nextItem = lastItem = null;
+                detach();
+                lock.unlock();
+            }
         }
 
         public void remove() {
-            // assert lock.getHoldCount() == 0;
             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
             lock.lock();
+            // assert lock.getHoldCount() == 1;
             try {
                 if (!isDetached())
                     incorporateDequeues(); // might update lastRet or detach
@@ -1232,7 +1298,7 @@
          * from next(), as promised by returning true from hasNext().
          */
         void shutdown() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             cursor = NONE;
             if (nextIndex >= 0)
                 nextIndex = REMOVED;
@@ -1260,7 +1326,7 @@
          * @return true if this iterator should be unlinked from itrs
          */
         boolean removedAt(int removedIndex) {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             if (isDetached())
                 return true;
 
@@ -1285,7 +1351,7 @@
                 }
                 else if (x > removedDistance) {
                     // assert cursor != prevTakeIndex;
-                    this.cursor = cursor = dec(cursor);
+                    this.cursor = cursor = dec(cursor, len);
                 }
             }
             int lastRet = this.lastRet;
@@ -1294,7 +1360,7 @@
                 if (x == removedDistance)
                     this.lastRet = lastRet = REMOVED;
                 else if (x > removedDistance)
-                    this.lastRet = lastRet = dec(lastRet);
+                    this.lastRet = lastRet = dec(lastRet, len);
             }
             int nextIndex = this.nextIndex;
             if (nextIndex >= 0) {
@@ -1302,7 +1368,7 @@
                 if (x == removedDistance)
                     this.nextIndex = nextIndex = REMOVED;
                 else if (x > removedDistance)
-                    this.nextIndex = nextIndex = dec(nextIndex);
+                    this.nextIndex = nextIndex = dec(nextIndex, len);
             }
             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
                 this.prevTakeIndex = DETACHED;
@@ -1317,7 +1383,7 @@
          * @return true if this iterator should be unlinked from itrs
          */
         boolean takeIndexWrapped() {
-            // assert lock.getHoldCount() == 1;
+            // assert lock.isHeldByCurrentThread();
             if (isDetached())
                 return true;
             if (itrs.cycles - prevCycles > 1) {
@@ -1366,4 +1432,170 @@
                     Spliterator.CONCURRENT));
     }
 
+    public void forEach(Consumer<? super E> action) {
+        Objects.requireNonNull(action);
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            if (count > 0) {
+                final Object[] items = this.items;
+                for (int i = takeIndex, end = putIndex,
+                         to = (i < end) ? end : items.length;
+                     ; i = 0, to = end) {
+                    for (; i < to; i++)
+                        action.accept(itemAt(items, i));
+                    if (to == end) break;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean removeIf(Predicate<? super E> filter) {
+        Objects.requireNonNull(filter);
+        return bulkRemove(filter);
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean removeAll(Collection<?> c) {
+        Objects.requireNonNull(c);
+        return bulkRemove(e -> c.contains(e));
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean retainAll(Collection<?> c) {
+        Objects.requireNonNull(c);
+        return bulkRemove(e -> !c.contains(e));
+    }
+
+    /** Implementation of bulk remove methods. */
+    private boolean bulkRemove(Predicate<? super E> filter) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            if (itrs == null) { // check for active iterators
+                if (count > 0) {
+                    final Object[] items = this.items;
+                    // Optimize for initial run of survivors
+                    for (int i = takeIndex, end = putIndex,
+                             to = (i < end) ? end : items.length;
+                         ; i = 0, to = end) {
+                        for (; i < to; i++)
+                            if (filter.test(itemAt(items, i)))
+                                return bulkRemoveModified(filter, i);
+                        if (to == end) break;
+                    }
+                }
+                return false;
+            }
+        } finally {
+            lock.unlock();
+        }
+        // Active iterators are too hairy!
+        // Punting (for now) to the slow n^2 algorithm ...
+        return super.removeIf(filter);
+    }
+
+    // A tiny bit set implementation
+
+    private static long[] nBits(int n) {
+        return new long[((n - 1) >> 6) + 1];
+    }
+    private static void setBit(long[] bits, int i) {
+        bits[i >> 6] |= 1L << i;
+    }
+    private static boolean isClear(long[] bits, int i) {
+        return (bits[i >> 6] & (1L << i)) == 0;
+    }
+
+    /**
+     * Returns circular distance from i to j, disambiguating i == j to
+     * items.length; never returns 0.
+     */
+    private int distanceNonEmpty(int i, int j) {
+        if ((j -= i) <= 0) j += items.length;
+        return j;
+    }
+
+    /**
+     * Helper for bulkRemove, in case of at least one deletion.
+     * Tolerate predicates that reentrantly access the collection for
+     * read (but not write), so traverse once to find elements to
+     * delete, a second pass to physically expunge.
+     *
+     * @param beg valid index of first element to be deleted
+     */
+    private boolean bulkRemoveModified(
+        Predicate<? super E> filter, final int beg) {
+        final Object[] es = items;
+        final int capacity = items.length;
+        final int end = putIndex;
+        final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
+        deathRow[0] = 1L;   // set bit 0
+        for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
+             ; i = 0, to = end, k -= capacity) {
+            for (; i < to; i++)
+                if (filter.test(itemAt(es, i)))
+                    setBit(deathRow, i - k);
+            if (to == end) break;
+        }
+        // a two-finger traversal, with hare i reading, tortoise w writing
+        int w = beg;
+        for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
+             ; w = 0) { // w rejoins i on second leg
+            // In this loop, i and w are on the same leg, with i > w
+            for (; i < to; i++)
+                if (isClear(deathRow, i - k))
+                    es[w++] = es[i];
+            if (to == end) break;
+            // In this loop, w is on the first leg, i on the second
+            for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
+                if (isClear(deathRow, i - k))
+                    es[w++] = es[i];
+            if (i >= to) {
+                if (w == capacity) w = 0; // "corner" case
+                break;
+            }
+        }
+        count -= distanceNonEmpty(w, end);
+        circularClear(es, putIndex = w, end);
+        return true;
+    }
+
+    /** debugging */
+    void checkInvariants() {
+        // meta-assertions
+        // assert lock.isHeldByCurrentThread();
+        try {
+            // Unlike ArrayDeque, we have a count field but no spare slot.
+            // We prefer ArrayDeque's strategy (and the names of its fields!),
+            // but our field layout is baked into the serial form, and so is
+            // too annoying to change.
+            //
+            // putIndex == takeIndex must be disambiguated by checking count.
+            int capacity = items.length;
+            // assert capacity > 0;
+            // assert takeIndex >= 0 && takeIndex < capacity;
+            // assert putIndex >= 0 && putIndex < capacity;
+            // assert count <= capacity;
+            // assert takeIndex == putIndex || items[takeIndex] != null;
+            // assert count == capacity || items[putIndex] == null;
+            // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
+        } catch (Throwable t) {
+            System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
+                              takeIndex, putIndex, count, items.length);
+            System.err.printf("items=%s%n",
+                              Arrays.toString(items));
+            throw t;
+        }
+    }
+
 }