jdk/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
changeset 2 90ce3da70b43
child 5506 202f599c92aa
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java	Sat Dec 01 00:00:00 2007 +0000
@@ -0,0 +1,592 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An unbounded {@linkplain BlockingQueue blocking queue} that uses
+ * the same ordering rules as class {@link PriorityQueue} and supplies
+ * blocking retrieval operations.  While this queue is logically
+ * unbounded, attempted additions may fail due to resource exhaustion
+ * (causing <tt>OutOfMemoryError</tt>). This class does not permit
+ * <tt>null</tt> elements.  A priority queue relying on {@linkplain
+ * Comparable natural ordering} also does not permit insertion of
+ * non-comparable objects (doing so results in
+ * <tt>ClassCastException</tt>).
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.  The Iterator provided in method {@link
+ * #iterator()} is <em>not</em> guaranteed to traverse the elements of
+ * the PriorityBlockingQueue in any particular order. If you need
+ * ordered traversal, consider using
+ * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
+ * can be used to <em>remove</em> some or all elements in priority
+ * order and place them in another collection.
+ *
+ * <p>Operations on this class make no guarantees about the ordering
+ * of elements with equal priority. If you need to enforce an
+ * ordering, you can define custom classes or comparators that use a
+ * secondary key to break ties in primary priority values.  For
+ * example, here is a class that applies first-in-first-out
+ * tie-breaking to comparable elements. To use it, you would insert a
+ * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
+ *
+ * <pre>
+ * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
+ *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
+ *   final static AtomicLong seq = new AtomicLong();
+ *   final long seqNum;
+ *   final E entry;
+ *   public FIFOEntry(E entry) {
+ *     seqNum = seq.getAndIncrement();
+ *     this.entry = entry;
+ *   }
+ *   public E getEntry() { return entry; }
+ *   public int compareTo(FIFOEntry&lt;E&gt; other) {
+ *     int res = entry.compareTo(other.entry);
+ *     if (res == 0 &amp;&amp; other.entry != this.entry)
+ *       res = (seqNum &lt; other.seqNum ? -1 : 1);
+ *     return res;
+ *   }
+ * }</pre>
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class PriorityBlockingQueue<E> extends AbstractQueue<E>
+    implements BlockingQueue<E>, java.io.Serializable {
+    private static final long serialVersionUID = 5595510919245408276L;
+
+    private final PriorityQueue<E> q;
+    private final ReentrantLock lock = new ReentrantLock(true);
+    private final Condition notEmpty = lock.newCondition();
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the default
+     * initial capacity (11) that orders its elements according to
+     * their {@linkplain Comparable natural ordering}.
+     */
+    public PriorityBlockingQueue() {
+        q = new PriorityQueue<E>();
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the specified
+     * initial capacity that orders its elements according to their
+     * {@linkplain Comparable natural ordering}.
+     *
+     * @param initialCapacity the initial capacity for this priority queue
+     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+     *         than 1
+     */
+    public PriorityBlockingQueue(int initialCapacity) {
+        q = new PriorityQueue<E>(initialCapacity, null);
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
+     * capacity that orders its elements according to the specified
+     * comparator.
+     *
+     * @param initialCapacity the initial capacity for this priority queue
+     * @param  comparator the comparator that will be used to order this
+     *         priority queue.  If {@code null}, the {@linkplain Comparable
+     *         natural ordering} of the elements will be used.
+     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+     *         than 1
+     */
+    public PriorityBlockingQueue(int initialCapacity,
+                                 Comparator<? super E> comparator) {
+        q = new PriorityQueue<E>(initialCapacity, comparator);
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
+     * in the specified collection.  If the specified collection is a
+     * {@link SortedSet} or a {@link PriorityQueue},  this
+     * priority queue will be ordered according to the same ordering.
+     * Otherwise, this priority queue will be ordered according to the
+     * {@linkplain Comparable natural ordering} of its elements.
+     *
+     * @param  c the collection whose elements are to be placed
+     *         into this priority queue
+     * @throws ClassCastException if elements of the specified collection
+     *         cannot be compared to one another according to the priority
+     *         queue's ordering
+     * @throws NullPointerException if the specified collection or any
+     *         of its elements are null
+     */
+    public PriorityBlockingQueue(Collection<? extends E> c) {
+        q = new PriorityQueue<E>(c);
+    }
+
+    /**
+     * Inserts the specified element into this priority queue.
+     *
+     * @param e the element to add
+     * @return <tt>true</tt> (as specified by {@link Collection#add})
+     * @throws ClassCastException if the specified element cannot be compared
+     *         with elements currently in the priority queue according to the
+     *         priority queue's ordering
+     * @throws NullPointerException if the specified element is null
+     */
+    public boolean add(E e) {
+        return offer(e);
+    }
+
+    /**
+     * Inserts the specified element into this priority queue.
+     *
+     * @param e the element to add
+     * @return <tt>true</tt> (as specified by {@link Queue#offer})
+     * @throws ClassCastException if the specified element cannot be compared
+     *         with elements currently in the priority queue according to the
+     *         priority queue's ordering
+     * @throws NullPointerException if the specified element is null
+     */
+    public boolean offer(E e) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            boolean ok = q.offer(e);
+            assert ok;
+            notEmpty.signal();
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Inserts the specified element into this priority queue. As the queue is
+     * unbounded this method will never block.
+     *
+     * @param e the element to add
+     * @throws ClassCastException if the specified element cannot be compared
+     *         with elements currently in the priority queue according to the
+     *         priority queue's ordering
+     * @throws NullPointerException if the specified element is null
+     */
+    public void put(E e) {
+        offer(e); // never need to block
+    }
+
+    /**
+     * Inserts the specified element into this priority queue. As the queue is
+     * unbounded this method will never block.
+     *
+     * @param e the element to add
+     * @param timeout This parameter is ignored as the method never blocks
+     * @param unit This parameter is ignored as the method never blocks
+     * @return <tt>true</tt>
+     * @throws ClassCastException if the specified element cannot be compared
+     *         with elements currently in the priority queue according to the
+     *         priority queue's ordering
+     * @throws NullPointerException if the specified element is null
+     */
+    public boolean offer(E e, long timeout, TimeUnit unit) {
+        return offer(e); // never need to block
+    }
+
+    public E poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.poll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E take() throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            try {
+                while (q.size() == 0)
+                    notEmpty.await();
+            } catch (InterruptedException ie) {
+                notEmpty.signal(); // propagate to non-interrupted thread
+                throw ie;
+            }
+            E x = q.poll();
+            assert x != null;
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                E x = q.poll();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                try {
+                    nanos = notEmpty.awaitNanos(nanos);
+                } catch (InterruptedException ie) {
+                    notEmpty.signal(); // propagate to non-interrupted thread
+                    throw ie;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E peek() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.peek();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the comparator used to order the elements in this queue,
+     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
+     * natural ordering} of its elements.
+     *
+     * @return the comparator used to order the elements in this queue,
+     *         or <tt>null</tt> if this queue uses the natural
+     *         ordering of its elements
+     */
+    public Comparator<? super E> comparator() {
+        return q.comparator();
+    }
+
+    public int size() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Always returns <tt>Integer.MAX_VALUE</tt> because
+     * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
+     * @return <tt>Integer.MAX_VALUE</tt>
+     */
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Removes a single instance of the specified element from this queue,
+     * if it is present.  More formally, removes an element {@code e} such
+     * that {@code o.equals(e)}, if this queue contains one or more such
+     * elements.  Returns {@code true} if and only if this queue contained
+     * the specified element (or equivalently, if this queue changed as a
+     * result of the call).
+     *
+     * @param o element to be removed from this queue, if present
+     * @return <tt>true</tt> if this queue changed as a result of the call
+     */
+    public boolean remove(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.remove(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns {@code true} if this queue contains the specified element.
+     * More formally, returns {@code true} if and only if this queue contains
+     * at least one element {@code e} such that {@code o.equals(e)}.
+     *
+     * @param o object to be checked for containment in this queue
+     * @return <tt>true</tt> if this queue contains the specified element
+     */
+    public boolean contains(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.contains(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an array containing all of the elements in this queue.
+     * The returned array elements are in no particular order.
+     *
+     * <p>The returned array will be "safe" in that no references to it are
+     * maintained by this queue.  (In other words, this method must allocate
+     * a new array).  The caller is thus free to modify the returned array.
+     *
+     * <p>This method acts as bridge between array-based and collection-based
+     * APIs.
+     *
+     * @return an array containing all of the elements in this queue
+     */
+    public Object[] toArray() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public String toString() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toString();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @throws UnsupportedOperationException {@inheritDoc}
+     * @throws ClassCastException            {@inheritDoc}
+     * @throws NullPointerException          {@inheritDoc}
+     * @throws IllegalArgumentException      {@inheritDoc}
+     */
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            E e;
+            while ( (e = q.poll()) != null) {
+                c.add(e);
+                ++n;
+            }
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @throws UnsupportedOperationException {@inheritDoc}
+     * @throws ClassCastException            {@inheritDoc}
+     * @throws NullPointerException          {@inheritDoc}
+     * @throws IllegalArgumentException      {@inheritDoc}
+     */
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        if (maxElements <= 0)
+            return 0;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            E e;
+            while (n < maxElements && (e = q.poll()) != null) {
+                c.add(e);
+                ++n;
+            }
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this queue.
+     * The queue will be empty after this call returns.
+     */
+    public void clear() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            q.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an array containing all of the elements in this queue; the
+     * runtime type of the returned array is that of the specified array.
+     * The returned array elements are in no particular order.
+     * If the queue fits in the specified array, it is returned therein.
+     * Otherwise, a new array is allocated with the runtime type of the
+     * specified array and the size of this queue.
+     *
+     * <p>If this queue fits in the specified array with room to spare
+     * (i.e., the array has more elements than this queue), the element in
+     * the array immediately following the end of the queue is set to
+     * <tt>null</tt>.
+     *
+     * <p>Like the {@link #toArray()} method, this method acts as bridge between
+     * array-based and collection-based APIs.  Further, this method allows
+     * precise control over the runtime type of the output array, and may,
+     * under certain circumstances, be used to save allocation costs.
+     *
+     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+     * The following code can be used to dump the queue into a newly
+     * allocated array of <tt>String</tt>:
+     *
+     * <pre>
+     *     String[] y = x.toArray(new String[0]);</pre>
+     *
+     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
+     * <tt>toArray()</tt>.
+     *
+     * @param a the array into which the elements of the queue are to
+     *          be stored, if it is big enough; otherwise, a new array of the
+     *          same runtime type is allocated for this purpose
+     * @return an array containing all of the elements in this queue
+     * @throws ArrayStoreException if the runtime type of the specified array
+     *         is not a supertype of the runtime type of every element in
+     *         this queue
+     * @throws NullPointerException if the specified array is null
+     */
+    public <T> T[] toArray(T[] a) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray(a);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this queue. The
+     * iterator does not return the elements in any particular order.
+     * The returned <tt>Iterator</tt> is a "weakly consistent"
+     * iterator that will never throw {@link
+     * ConcurrentModificationException}, and guarantees to traverse
+     * elements as they existed upon construction of the iterator, and
+     * may (but is not guaranteed to) reflect any modifications
+     * subsequent to construction.
+     *
+     * @return an iterator over the elements in this queue
+     */
+    public Iterator<E> iterator() {
+        return new Itr(toArray());
+    }
+
+    /**
+     * Snapshot iterator that works off copy of underlying q array.
+     */
+    private class Itr implements Iterator<E> {
+        final Object[] array; // Array of all elements
+        int cursor;           // index of next element to return;
+        int lastRet;          // index of last element, or -1 if no such
+
+        Itr(Object[] array) {
+            lastRet = -1;
+            this.array = array;
+        }
+
+        public boolean hasNext() {
+            return cursor < array.length;
+        }
+
+        public E next() {
+            if (cursor >= array.length)
+                throw new NoSuchElementException();
+            lastRet = cursor;
+            return (E)array[cursor++];
+        }
+
+        public void remove() {
+            if (lastRet < 0)
+                throw new IllegalStateException();
+            Object x = array[lastRet];
+            lastRet = -1;
+            // Traverse underlying queue to find == element,
+            // not just a .equals element.
+            lock.lock();
+            try {
+                for (Iterator it = q.iterator(); it.hasNext(); ) {
+                    if (it.next() == x) {
+                        it.remove();
+                        return;
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Saves the state to a stream (that is, serializes it).  This
+     * merely wraps default serialization within lock.  The
+     * serialization strategy for items is left to underlying
+     * Queue. Note that locking is not needed on deserialization, so
+     * readObject is not defined, just relying on default.
+     */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        lock.lock();
+        try {
+            s.defaultWriteObject();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}