jdk/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
changeset 2 90ce3da70b43
child 4110 ac033ba6ede4
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.  Sun designates this
       
     7  * particular file as subject to the "Classpath" exception as provided
       
     8  * by Sun in the LICENSE file that accompanied this code.
       
     9  *
       
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    13  * version 2 for more details (a copy is included in the LICENSE file that
       
    14  * accompanied this code).
       
    15  *
       
    16  * You should have received a copy of the GNU General Public License version
       
    17  * 2 along with this work; if not, write to the Free Software Foundation,
       
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    19  *
       
    20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    21  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    22  * have any questions.
       
    23  */
       
    24 
       
    25 /*
       
    26  * This file is available under and governed by the GNU General Public
       
    27  * License version 2 only, as published by the Free Software Foundation.
       
    28  * However, the following notice accompanied the original version of this
       
    29  * file:
       
    30  *
       
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    32  * Expert Group and released to the public domain, as explained at
       
    33  * http://creativecommons.org/licenses/publicdomain
       
    34  */
       
    35 
       
    36 package java.util.concurrent;
       
    37 import java.util.concurrent.locks.*;
       
    38 import java.util.*;
       
    39 
       
    40 /**
       
    41  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
       
    42  * array.  This queue orders elements FIFO (first-in-first-out).  The
       
    43  * <em>head</em> of the queue is that element that has been on the
       
    44  * queue the longest time.  The <em>tail</em> of the queue is that
       
    45  * element that has been on the queue the shortest time. New elements
       
    46  * are inserted at the tail of the queue, and the queue retrieval
       
    47  * operations obtain elements at the head of the queue.
       
    48  *
       
    49  * <p>This is a classic &quot;bounded buffer&quot;, in which a
       
    50  * fixed-sized array holds elements inserted by producers and
       
    51  * extracted by consumers.  Once created, the capacity cannot be
       
    52  * increased.  Attempts to <tt>put</tt> an element into a full queue
       
    53  * will result in the operation blocking; attempts to <tt>take</tt> an
       
    54  * element from an empty queue will similarly block.
       
    55  *
       
    56  * <p> This class supports an optional fairness policy for ordering
       
    57  * waiting producer and consumer threads.  By default, this ordering
       
    58  * is not guaranteed. However, a queue constructed with fairness set
       
    59  * to <tt>true</tt> grants threads access in FIFO order. Fairness
       
    60  * generally decreases throughput but reduces variability and avoids
       
    61  * starvation.
       
    62  *
       
    63  * <p>This class and its iterator implement all of the
       
    64  * <em>optional</em> methods of the {@link Collection} and {@link
       
    65  * Iterator} interfaces.
       
    66  *
       
    67  * <p>This class is a member of the
       
    68  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
       
    69  * Java Collections Framework</a>.
       
    70  *
       
    71  * @since 1.5
       
    72  * @author Doug Lea
       
    73  * @param <E> the type of elements held in this collection
       
    74  */
       
    75 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
       
    76         implements BlockingQueue<E>, java.io.Serializable {
       
    77 
       
    78     /**
       
    79      * Serialization ID. This class relies on default serialization
       
    80      * even for the items array, which is default-serialized, even if
       
    81      * it is empty. Otherwise it could not be declared final, which is
       
    82      * necessary here.
       
    83      */
       
    84     private static final long serialVersionUID = -817911632652898426L;
       
    85 
       
    86     /** The queued items  */
       
    87     private final E[] items;
       
    88     /** items index for next take, poll or remove */
       
    89     private int takeIndex;
       
    90     /** items index for next put, offer, or add. */
       
    91     private int putIndex;
       
    92     /** Number of items in the queue */
       
    93     private int count;
       
    94 
       
    95     /*
       
    96      * Concurrency control uses the classic two-condition algorithm
       
    97      * found in any textbook.
       
    98      */
       
    99 
       
   100     /** Main lock guarding all access */
       
   101     private final ReentrantLock lock;
       
   102     /** Condition for waiting takes */
       
   103     private final Condition notEmpty;
       
   104     /** Condition for waiting puts */
       
   105     private final Condition notFull;
       
   106 
       
   107     // Internal helper methods
       
   108 
       
   109     /**
       
   110      * Circularly increment i.
       
   111      */
       
   112     final int inc(int i) {
       
   113         return (++i == items.length)? 0 : i;
       
   114     }
       
   115 
       
   116     /**
       
   117      * Inserts element at current put position, advances, and signals.
       
   118      * Call only when holding lock.
       
   119      */
       
   120     private void insert(E x) {
       
   121         items[putIndex] = x;
       
   122         putIndex = inc(putIndex);
       
   123         ++count;
       
   124         notEmpty.signal();
       
   125     }
       
   126 
       
   127     /**
       
   128      * Extracts element at current take position, advances, and signals.
       
   129      * Call only when holding lock.
       
   130      */
       
   131     private E extract() {
       
   132         final E[] items = this.items;
       
   133         E x = items[takeIndex];
       
   134         items[takeIndex] = null;
       
   135         takeIndex = inc(takeIndex);
       
   136         --count;
       
   137         notFull.signal();
       
   138         return x;
       
   139     }
       
   140 
       
   141     /**
       
   142      * Utility for remove and iterator.remove: Delete item at position i.
       
   143      * Call only when holding lock.
       
   144      */
       
   145     void removeAt(int i) {
       
   146         final E[] items = this.items;
       
   147         // if removing front item, just advance
       
   148         if (i == takeIndex) {
       
   149             items[takeIndex] = null;
       
   150             takeIndex = inc(takeIndex);
       
   151         } else {
       
   152             // slide over all others up through putIndex.
       
   153             for (;;) {
       
   154                 int nexti = inc(i);
       
   155                 if (nexti != putIndex) {
       
   156                     items[i] = items[nexti];
       
   157                     i = nexti;
       
   158                 } else {
       
   159                     items[i] = null;
       
   160                     putIndex = i;
       
   161                     break;
       
   162                 }
       
   163             }
       
   164         }
       
   165         --count;
       
   166         notFull.signal();
       
   167     }
       
   168 
       
   169     /**
       
   170      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
       
   171      * capacity and default access policy.
       
   172      *
       
   173      * @param capacity the capacity of this queue
       
   174      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
       
   175      */
       
   176     public ArrayBlockingQueue(int capacity) {
       
   177         this(capacity, false);
       
   178     }
       
   179 
       
   180     /**
       
   181      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
       
   182      * capacity and the specified access policy.
       
   183      *
       
   184      * @param capacity the capacity of this queue
       
   185      * @param fair if <tt>true</tt> then queue accesses for threads blocked
       
   186      *        on insertion or removal, are processed in FIFO order;
       
   187      *        if <tt>false</tt> the access order is unspecified.
       
   188      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
       
   189      */
       
   190     public ArrayBlockingQueue(int capacity, boolean fair) {
       
   191         if (capacity <= 0)
       
   192             throw new IllegalArgumentException();
       
   193         this.items = (E[]) new Object[capacity];
       
   194         lock = new ReentrantLock(fair);
       
   195         notEmpty = lock.newCondition();
       
   196         notFull =  lock.newCondition();
       
   197     }
       
   198 
       
   199     /**
       
   200      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
       
   201      * capacity, the specified access policy and initially containing the
       
   202      * elements of the given collection,
       
   203      * added in traversal order of the collection's iterator.
       
   204      *
       
   205      * @param capacity the capacity of this queue
       
   206      * @param fair if <tt>true</tt> then queue accesses for threads blocked
       
   207      *        on insertion or removal, are processed in FIFO order;
       
   208      *        if <tt>false</tt> the access order is unspecified.
       
   209      * @param c the collection of elements to initially contain
       
   210      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
       
   211      *         <tt>c.size()</tt>, or less than 1.
       
   212      * @throws NullPointerException if the specified collection or any
       
   213      *         of its elements are null
       
   214      */
       
   215     public ArrayBlockingQueue(int capacity, boolean fair,
       
   216                               Collection<? extends E> c) {
       
   217         this(capacity, fair);
       
   218         if (capacity < c.size())
       
   219             throw new IllegalArgumentException();
       
   220 
       
   221         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
       
   222             add(it.next());
       
   223     }
       
   224 
       
   225     /**
       
   226      * Inserts the specified element at the tail of this queue if it is
       
   227      * possible to do so immediately without exceeding the queue's capacity,
       
   228      * returning <tt>true</tt> upon success and throwing an
       
   229      * <tt>IllegalStateException</tt> if this queue is full.
       
   230      *
       
   231      * @param e the element to add
       
   232      * @return <tt>true</tt> (as specified by {@link Collection#add})
       
   233      * @throws IllegalStateException if this queue is full
       
   234      * @throws NullPointerException if the specified element is null
       
   235      */
       
   236     public boolean add(E e) {
       
   237         return super.add(e);
       
   238     }
       
   239 
       
   240     /**
       
   241      * Inserts the specified element at the tail of this queue if it is
       
   242      * possible to do so immediately without exceeding the queue's capacity,
       
   243      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
       
   244      * is full.  This method is generally preferable to method {@link #add},
       
   245      * which can fail to insert an element only by throwing an exception.
       
   246      *
       
   247      * @throws NullPointerException if the specified element is null
       
   248      */
       
   249     public boolean offer(E e) {
       
   250         if (e == null) throw new NullPointerException();
       
   251         final ReentrantLock lock = this.lock;
       
   252         lock.lock();
       
   253         try {
       
   254             if (count == items.length)
       
   255                 return false;
       
   256             else {
       
   257                 insert(e);
       
   258                 return true;
       
   259             }
       
   260         } finally {
       
   261             lock.unlock();
       
   262         }
       
   263     }
       
   264 
       
   265     /**
       
   266      * Inserts the specified element at the tail of this queue, waiting
       
   267      * for space to become available if the queue is full.
       
   268      *
       
   269      * @throws InterruptedException {@inheritDoc}
       
   270      * @throws NullPointerException {@inheritDoc}
       
   271      */
       
   272     public void put(E e) throws InterruptedException {
       
   273         if (e == null) throw new NullPointerException();
       
   274         final E[] items = this.items;
       
   275         final ReentrantLock lock = this.lock;
       
   276         lock.lockInterruptibly();
       
   277         try {
       
   278             try {
       
   279                 while (count == items.length)
       
   280                     notFull.await();
       
   281             } catch (InterruptedException ie) {
       
   282                 notFull.signal(); // propagate to non-interrupted thread
       
   283                 throw ie;
       
   284             }
       
   285             insert(e);
       
   286         } finally {
       
   287             lock.unlock();
       
   288         }
       
   289     }
       
   290 
       
   291     /**
       
   292      * Inserts the specified element at the tail of this queue, waiting
       
   293      * up to the specified wait time for space to become available if
       
   294      * the queue is full.
       
   295      *
       
   296      * @throws InterruptedException {@inheritDoc}
       
   297      * @throws NullPointerException {@inheritDoc}
       
   298      */
       
   299     public boolean offer(E e, long timeout, TimeUnit unit)
       
   300         throws InterruptedException {
       
   301 
       
   302         if (e == null) throw new NullPointerException();
       
   303         long nanos = unit.toNanos(timeout);
       
   304         final ReentrantLock lock = this.lock;
       
   305         lock.lockInterruptibly();
       
   306         try {
       
   307             for (;;) {
       
   308                 if (count != items.length) {
       
   309                     insert(e);
       
   310                     return true;
       
   311                 }
       
   312                 if (nanos <= 0)
       
   313                     return false;
       
   314                 try {
       
   315                     nanos = notFull.awaitNanos(nanos);
       
   316                 } catch (InterruptedException ie) {
       
   317                     notFull.signal(); // propagate to non-interrupted thread
       
   318                     throw ie;
       
   319                 }
       
   320             }
       
   321         } finally {
       
   322             lock.unlock();
       
   323         }
       
   324     }
       
   325 
       
   326     public E poll() {
       
   327         final ReentrantLock lock = this.lock;
       
   328         lock.lock();
       
   329         try {
       
   330             if (count == 0)
       
   331                 return null;
       
   332             E x = extract();
       
   333             return x;
       
   334         } finally {
       
   335             lock.unlock();
       
   336         }
       
   337     }
       
   338 
       
   339     public E take() throws InterruptedException {
       
   340         final ReentrantLock lock = this.lock;
       
   341         lock.lockInterruptibly();
       
   342         try {
       
   343             try {
       
   344                 while (count == 0)
       
   345                     notEmpty.await();
       
   346             } catch (InterruptedException ie) {
       
   347                 notEmpty.signal(); // propagate to non-interrupted thread
       
   348                 throw ie;
       
   349             }
       
   350             E x = extract();
       
   351             return x;
       
   352         } finally {
       
   353             lock.unlock();
       
   354         }
       
   355     }
       
   356 
       
   357     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
       
   358         long nanos = unit.toNanos(timeout);
       
   359         final ReentrantLock lock = this.lock;
       
   360         lock.lockInterruptibly();
       
   361         try {
       
   362             for (;;) {
       
   363                 if (count != 0) {
       
   364                     E x = extract();
       
   365                     return x;
       
   366                 }
       
   367                 if (nanos <= 0)
       
   368                     return null;
       
   369                 try {
       
   370                     nanos = notEmpty.awaitNanos(nanos);
       
   371                 } catch (InterruptedException ie) {
       
   372                     notEmpty.signal(); // propagate to non-interrupted thread
       
   373                     throw ie;
       
   374                 }
       
   375 
       
   376             }
       
   377         } finally {
       
   378             lock.unlock();
       
   379         }
       
   380     }
       
   381 
       
   382     public E peek() {
       
   383         final ReentrantLock lock = this.lock;
       
   384         lock.lock();
       
   385         try {
       
   386             return (count == 0) ? null : items[takeIndex];
       
   387         } finally {
       
   388             lock.unlock();
       
   389         }
       
   390     }
       
   391 
       
   392     // this doc comment is overridden to remove the reference to collections
       
   393     // greater in size than Integer.MAX_VALUE
       
   394     /**
       
   395      * Returns the number of elements in this queue.
       
   396      *
       
   397      * @return the number of elements in this queue
       
   398      */
       
   399     public int size() {
       
   400         final ReentrantLock lock = this.lock;
       
   401         lock.lock();
       
   402         try {
       
   403             return count;
       
   404         } finally {
       
   405             lock.unlock();
       
   406         }
       
   407     }
       
   408 
       
   409     // this doc comment is a modified copy of the inherited doc comment,
       
   410     // without the reference to unlimited queues.
       
   411     /**
       
   412      * Returns the number of additional elements that this queue can ideally
       
   413      * (in the absence of memory or resource constraints) accept without
       
   414      * blocking. This is always equal to the initial capacity of this queue
       
   415      * less the current <tt>size</tt> of this queue.
       
   416      *
       
   417      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
       
   418      * an element will succeed by inspecting <tt>remainingCapacity</tt>
       
   419      * because it may be the case that another thread is about to
       
   420      * insert or remove an element.
       
   421      */
       
   422     public int remainingCapacity() {
       
   423         final ReentrantLock lock = this.lock;
       
   424         lock.lock();
       
   425         try {
       
   426             return items.length - count;
       
   427         } finally {
       
   428             lock.unlock();
       
   429         }
       
   430     }
       
   431 
       
   432     /**
       
   433      * Removes a single instance of the specified element from this queue,
       
   434      * if it is present.  More formally, removes an element <tt>e</tt> such
       
   435      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
       
   436      * elements.
       
   437      * Returns <tt>true</tt> if this queue contained the specified element
       
   438      * (or equivalently, if this queue changed as a result of the call).
       
   439      *
       
   440      * @param o element to be removed from this queue, if present
       
   441      * @return <tt>true</tt> if this queue changed as a result of the call
       
   442      */
       
   443     public boolean remove(Object o) {
       
   444         if (o == null) return false;
       
   445         final E[] items = this.items;
       
   446         final ReentrantLock lock = this.lock;
       
   447         lock.lock();
       
   448         try {
       
   449             int i = takeIndex;
       
   450             int k = 0;
       
   451             for (;;) {
       
   452                 if (k++ >= count)
       
   453                     return false;
       
   454                 if (o.equals(items[i])) {
       
   455                     removeAt(i);
       
   456                     return true;
       
   457                 }
       
   458                 i = inc(i);
       
   459             }
       
   460 
       
   461         } finally {
       
   462             lock.unlock();
       
   463         }
       
   464     }
       
   465 
       
   466     /**
       
   467      * Returns <tt>true</tt> if this queue contains the specified element.
       
   468      * More formally, returns <tt>true</tt> if and only if this queue contains
       
   469      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
       
   470      *
       
   471      * @param o object to be checked for containment in this queue
       
   472      * @return <tt>true</tt> if this queue contains the specified element
       
   473      */
       
   474     public boolean contains(Object o) {
       
   475         if (o == null) return false;
       
   476         final E[] items = this.items;
       
   477         final ReentrantLock lock = this.lock;
       
   478         lock.lock();
       
   479         try {
       
   480             int i = takeIndex;
       
   481             int k = 0;
       
   482             while (k++ < count) {
       
   483                 if (o.equals(items[i]))
       
   484                     return true;
       
   485                 i = inc(i);
       
   486             }
       
   487             return false;
       
   488         } finally {
       
   489             lock.unlock();
       
   490         }
       
   491     }
       
   492 
       
   493     /**
       
   494      * Returns an array containing all of the elements in this queue, in
       
   495      * proper sequence.
       
   496      *
       
   497      * <p>The returned array will be "safe" in that no references to it are
       
   498      * maintained by this queue.  (In other words, this method must allocate
       
   499      * a new array).  The caller is thus free to modify the returned array.
       
   500      *
       
   501      * <p>This method acts as bridge between array-based and collection-based
       
   502      * APIs.
       
   503      *
       
   504      * @return an array containing all of the elements in this queue
       
   505      */
       
   506     public Object[] toArray() {
       
   507         final E[] items = this.items;
       
   508         final ReentrantLock lock = this.lock;
       
   509         lock.lock();
       
   510         try {
       
   511             Object[] a = new Object[count];
       
   512             int k = 0;
       
   513             int i = takeIndex;
       
   514             while (k < count) {
       
   515                 a[k++] = items[i];
       
   516                 i = inc(i);
       
   517             }
       
   518             return a;
       
   519         } finally {
       
   520             lock.unlock();
       
   521         }
       
   522     }
       
   523 
       
   524     /**
       
   525      * Returns an array containing all of the elements in this queue, in
       
   526      * proper sequence; the runtime type of the returned array is that of
       
   527      * the specified array.  If the queue fits in the specified array, it
       
   528      * is returned therein.  Otherwise, a new array is allocated with the
       
   529      * runtime type of the specified array and the size of this queue.
       
   530      *
       
   531      * <p>If this queue fits in the specified array with room to spare
       
   532      * (i.e., the array has more elements than this queue), the element in
       
   533      * the array immediately following the end of the queue is set to
       
   534      * <tt>null</tt>.
       
   535      *
       
   536      * <p>Like the {@link #toArray()} method, this method acts as bridge between
       
   537      * array-based and collection-based APIs.  Further, this method allows
       
   538      * precise control over the runtime type of the output array, and may,
       
   539      * under certain circumstances, be used to save allocation costs.
       
   540      *
       
   541      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
       
   542      * The following code can be used to dump the queue into a newly
       
   543      * allocated array of <tt>String</tt>:
       
   544      *
       
   545      * <pre>
       
   546      *     String[] y = x.toArray(new String[0]);</pre>
       
   547      *
       
   548      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
       
   549      * <tt>toArray()</tt>.
       
   550      *
       
   551      * @param a the array into which the elements of the queue are to
       
   552      *          be stored, if it is big enough; otherwise, a new array of the
       
   553      *          same runtime type is allocated for this purpose
       
   554      * @return an array containing all of the elements in this queue
       
   555      * @throws ArrayStoreException if the runtime type of the specified array
       
   556      *         is not a supertype of the runtime type of every element in
       
   557      *         this queue
       
   558      * @throws NullPointerException if the specified array is null
       
   559      */
       
   560     public <T> T[] toArray(T[] a) {
       
   561         final E[] items = this.items;
       
   562         final ReentrantLock lock = this.lock;
       
   563         lock.lock();
       
   564         try {
       
   565             if (a.length < count)
       
   566                 a = (T[])java.lang.reflect.Array.newInstance(
       
   567                     a.getClass().getComponentType(),
       
   568                     count
       
   569                     );
       
   570 
       
   571             int k = 0;
       
   572             int i = takeIndex;
       
   573             while (k < count) {
       
   574                 a[k++] = (T)items[i];
       
   575                 i = inc(i);
       
   576             }
       
   577             if (a.length > count)
       
   578                 a[count] = null;
       
   579             return a;
       
   580         } finally {
       
   581             lock.unlock();
       
   582         }
       
   583     }
       
   584 
       
   585     public String toString() {
       
   586         final ReentrantLock lock = this.lock;
       
   587         lock.lock();
       
   588         try {
       
   589             return super.toString();
       
   590         } finally {
       
   591             lock.unlock();
       
   592         }
       
   593     }
       
   594 
       
   595     /**
       
   596      * Atomically removes all of the elements from this queue.
       
   597      * The queue will be empty after this call returns.
       
   598      */
       
   599     public void clear() {
       
   600         final E[] items = this.items;
       
   601         final ReentrantLock lock = this.lock;
       
   602         lock.lock();
       
   603         try {
       
   604             int i = takeIndex;
       
   605             int k = count;
       
   606             while (k-- > 0) {
       
   607                 items[i] = null;
       
   608                 i = inc(i);
       
   609             }
       
   610             count = 0;
       
   611             putIndex = 0;
       
   612             takeIndex = 0;
       
   613             notFull.signalAll();
       
   614         } finally {
       
   615             lock.unlock();
       
   616         }
       
   617     }
       
   618 
       
   619     /**
       
   620      * @throws UnsupportedOperationException {@inheritDoc}
       
   621      * @throws ClassCastException            {@inheritDoc}
       
   622      * @throws NullPointerException          {@inheritDoc}
       
   623      * @throws IllegalArgumentException      {@inheritDoc}
       
   624      */
       
   625     public int drainTo(Collection<? super E> c) {
       
   626         if (c == null)
       
   627             throw new NullPointerException();
       
   628         if (c == this)
       
   629             throw new IllegalArgumentException();
       
   630         final E[] items = this.items;
       
   631         final ReentrantLock lock = this.lock;
       
   632         lock.lock();
       
   633         try {
       
   634             int i = takeIndex;
       
   635             int n = 0;
       
   636             int max = count;
       
   637             while (n < max) {
       
   638                 c.add(items[i]);
       
   639                 items[i] = null;
       
   640                 i = inc(i);
       
   641                 ++n;
       
   642             }
       
   643             if (n > 0) {
       
   644                 count = 0;
       
   645                 putIndex = 0;
       
   646                 takeIndex = 0;
       
   647                 notFull.signalAll();
       
   648             }
       
   649             return n;
       
   650         } finally {
       
   651             lock.unlock();
       
   652         }
       
   653     }
       
   654 
       
   655     /**
       
   656      * @throws UnsupportedOperationException {@inheritDoc}
       
   657      * @throws ClassCastException            {@inheritDoc}
       
   658      * @throws NullPointerException          {@inheritDoc}
       
   659      * @throws IllegalArgumentException      {@inheritDoc}
       
   660      */
       
   661     public int drainTo(Collection<? super E> c, int maxElements) {
       
   662         if (c == null)
       
   663             throw new NullPointerException();
       
   664         if (c == this)
       
   665             throw new IllegalArgumentException();
       
   666         if (maxElements <= 0)
       
   667             return 0;
       
   668         final E[] items = this.items;
       
   669         final ReentrantLock lock = this.lock;
       
   670         lock.lock();
       
   671         try {
       
   672             int i = takeIndex;
       
   673             int n = 0;
       
   674             int sz = count;
       
   675             int max = (maxElements < count)? maxElements : count;
       
   676             while (n < max) {
       
   677                 c.add(items[i]);
       
   678                 items[i] = null;
       
   679                 i = inc(i);
       
   680                 ++n;
       
   681             }
       
   682             if (n > 0) {
       
   683                 count -= n;
       
   684                 takeIndex = i;
       
   685                 notFull.signalAll();
       
   686             }
       
   687             return n;
       
   688         } finally {
       
   689             lock.unlock();
       
   690         }
       
   691     }
       
   692 
       
   693 
       
   694     /**
       
   695      * Returns an iterator over the elements in this queue in proper sequence.
       
   696      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
       
   697      * will never throw {@link ConcurrentModificationException},
       
   698      * and guarantees to traverse elements as they existed upon
       
   699      * construction of the iterator, and may (but is not guaranteed to)
       
   700      * reflect any modifications subsequent to construction.
       
   701      *
       
   702      * @return an iterator over the elements in this queue in proper sequence
       
   703      */
       
   704     public Iterator<E> iterator() {
       
   705         final ReentrantLock lock = this.lock;
       
   706         lock.lock();
       
   707         try {
       
   708             return new Itr();
       
   709         } finally {
       
   710             lock.unlock();
       
   711         }
       
   712     }
       
   713 
       
   714     /**
       
   715      * Iterator for ArrayBlockingQueue
       
   716      */
       
   717     private class Itr implements Iterator<E> {
       
   718         /**
       
   719          * Index of element to be returned by next,
       
   720          * or a negative number if no such.
       
   721          */
       
   722         private int nextIndex;
       
   723 
       
   724         /**
       
   725          * nextItem holds on to item fields because once we claim
       
   726          * that an element exists in hasNext(), we must return it in
       
   727          * the following next() call even if it was in the process of
       
   728          * being removed when hasNext() was called.
       
   729          */
       
   730         private E nextItem;
       
   731 
       
   732         /**
       
   733          * Index of element returned by most recent call to next.
       
   734          * Reset to -1 if this element is deleted by a call to remove.
       
   735          */
       
   736         private int lastRet;
       
   737 
       
   738         Itr() {
       
   739             lastRet = -1;
       
   740             if (count == 0)
       
   741                 nextIndex = -1;
       
   742             else {
       
   743                 nextIndex = takeIndex;
       
   744                 nextItem = items[takeIndex];
       
   745             }
       
   746         }
       
   747 
       
   748         public boolean hasNext() {
       
   749             /*
       
   750              * No sync. We can return true by mistake here
       
   751              * only if this iterator passed across threads,
       
   752              * which we don't support anyway.
       
   753              */
       
   754             return nextIndex >= 0;
       
   755         }
       
   756 
       
   757         /**
       
   758          * Checks whether nextIndex is valid; if so setting nextItem.
       
   759          * Stops iterator when either hits putIndex or sees null item.
       
   760          */
       
   761         private void checkNext() {
       
   762             if (nextIndex == putIndex) {
       
   763                 nextIndex = -1;
       
   764                 nextItem = null;
       
   765             } else {
       
   766                 nextItem = items[nextIndex];
       
   767                 if (nextItem == null)
       
   768                     nextIndex = -1;
       
   769             }
       
   770         }
       
   771 
       
   772         public E next() {
       
   773             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
       
   774             lock.lock();
       
   775             try {
       
   776                 if (nextIndex < 0)
       
   777                     throw new NoSuchElementException();
       
   778                 lastRet = nextIndex;
       
   779                 E x = nextItem;
       
   780                 nextIndex = inc(nextIndex);
       
   781                 checkNext();
       
   782                 return x;
       
   783             } finally {
       
   784                 lock.unlock();
       
   785             }
       
   786         }
       
   787 
       
   788         public void remove() {
       
   789             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
       
   790             lock.lock();
       
   791             try {
       
   792                 int i = lastRet;
       
   793                 if (i == -1)
       
   794                     throw new IllegalStateException();
       
   795                 lastRet = -1;
       
   796 
       
   797                 int ti = takeIndex;
       
   798                 removeAt(i);
       
   799                 // back up cursor (reset to front if was first element)
       
   800                 nextIndex = (i == ti) ? takeIndex : i;
       
   801                 checkNext();
       
   802             } finally {
       
   803                 lock.unlock();
       
   804             }
       
   805         }
       
   806     }
       
   807 }