jdk/src/share/classes/java/util/concurrent/DelayQueue.java
changeset 2 90ce3da70b43
child 60 429af02854d2
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/DelayQueue.java	Sat Dec 01 00:00:00 2007 +0000
@@ -0,0 +1,516 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An unbounded {@linkplain BlockingQueue blocking queue} of
+ * <tt>Delayed</tt> elements, in which an element can only be taken
+ * when its delay has expired.  The <em>head</em> of the queue is that
+ * <tt>Delayed</tt> element whose delay expired furthest in the
+ * past.  If no delay has expired there is no head and <tt>poll</tt>
+ * will return <tt>null</tt>. Expiration occurs when an element's
+ * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
+ * than or equal to zero.  Even though unexpired elements cannot be
+ * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
+ * treated as normal elements. For example, the <tt>size</tt> method
+ * returns the count of both expired and unexpired elements.
+ * This queue does not permit null elements.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+
+public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
+    implements BlockingQueue<E> {
+
+    private transient final ReentrantLock lock = new ReentrantLock();
+    private transient final Condition available = lock.newCondition();
+    private final PriorityQueue<E> q = new PriorityQueue<E>();
+
+    /**
+     * Creates a new <tt>DelayQueue</tt> that is initially empty.
+     */
+    public DelayQueue() {}
+
+    /**
+     * Creates a <tt>DelayQueue</tt> initially containing the elements of the
+     * given collection of {@link Delayed} instances.
+     *
+     * @param c the collection of elements to initially contain
+     * @throws NullPointerException if the specified collection or any
+     *         of its elements are null
+     */
+    public DelayQueue(Collection<? extends E> c) {
+        this.addAll(c);
+    }
+
+    /**
+     * Inserts the specified element into this delay queue.
+     *
+     * @param e the element to add
+     * @return <tt>true</tt> (as specified by {@link Collection#add})
+     * @throws NullPointerException if the specified element is null
+     */
+    public boolean add(E e) {
+        return offer(e);
+    }
+
+    /**
+     * Inserts the specified element into this delay queue.
+     *
+     * @param e the element to add
+     * @return <tt>true</tt>
+     * @throws NullPointerException if the specified element is null
+     */
+    public boolean offer(E e) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            E first = q.peek();
+            q.offer(e);
+            if (first == null || e.compareTo(first) < 0)
+                available.signalAll();
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Inserts the specified element into this delay queue. As the queue is
+     * unbounded this method will never block.
+     *
+     * @param e the element to add
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public void put(E e) {
+        offer(e);
+    }
+
+    /**
+     * Inserts the specified element into this delay queue. As the queue is
+     * unbounded this method will never block.
+     *
+     * @param e the element to add
+     * @param timeout This parameter is ignored as the method never blocks
+     * @param unit This parameter is ignored as the method never blocks
+     * @return <tt>true</tt>
+     * @throws NullPointerException {@inheritDoc}
+     */
+    public boolean offer(E e, long timeout, TimeUnit unit) {
+        return offer(e);
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, or returns <tt>null</tt>
+     * if this queue has no elements with an expired delay.
+     *
+     * @return the head of this queue, or <tt>null</tt> if this
+     *         queue has no elements with an expired delay
+     */
+    public E poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            E first = q.peek();
+            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                return null;
+            else {
+                E x = q.poll();
+                assert x != null;
+                if (q.size() != 0)
+                    available.signalAll();
+                return x;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, waiting if necessary
+     * until an element with an expired delay is available on this queue.
+     *
+     * @return the head of this queue
+     * @throws InterruptedException {@inheritDoc}
+     */
+    public E take() throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                E first = q.peek();
+                if (first == null) {
+                    available.await();
+                } else {
+                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay > 0) {
+                        long tl = available.awaitNanos(delay);
+                    } else {
+                        E x = q.poll();
+                        assert x != null;
+                        if (q.size() != 0)
+                            available.signalAll(); // wake up other takers
+                        return x;
+
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, waiting if necessary
+     * until an element with an expired delay is available on this queue,
+     * or the specified wait time expires.
+     *
+     * @return the head of this queue, or <tt>null</tt> if the
+     *         specified waiting time elapses before an element with
+     *         an expired delay becomes available
+     * @throws InterruptedException {@inheritDoc}
+     */
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                E first = q.peek();
+                if (first == null) {
+                    if (nanos <= 0)
+                        return null;
+                    else
+                        nanos = available.awaitNanos(nanos);
+                } else {
+                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay > 0) {
+                        if (nanos <= 0)
+                            return null;
+                        if (delay > nanos)
+                            delay = nanos;
+                        long timeLeft = available.awaitNanos(delay);
+                        nanos -= delay - timeLeft;
+                    } else {
+                        E x = q.poll();
+                        assert x != null;
+                        if (q.size() != 0)
+                            available.signalAll();
+                        return x;
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Retrieves, but does not remove, the head of this queue, or
+     * returns <tt>null</tt> if this queue is empty.  Unlike
+     * <tt>poll</tt>, if no expired elements are available in the queue,
+     * this method returns the element that will expire next,
+     * if one exists.
+     *
+     * @return the head of this queue, or <tt>null</tt> if this
+     *         queue is empty.
+     */
+    public E peek() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.peek();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int size() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @throws UnsupportedOperationException {@inheritDoc}
+     * @throws ClassCastException            {@inheritDoc}
+     * @throws NullPointerException          {@inheritDoc}
+     * @throws IllegalArgumentException      {@inheritDoc}
+     */
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            for (;;) {
+                E first = q.peek();
+                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                    break;
+                c.add(q.poll());
+                ++n;
+            }
+            if (n > 0)
+                available.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @throws UnsupportedOperationException {@inheritDoc}
+     * @throws ClassCastException            {@inheritDoc}
+     * @throws NullPointerException          {@inheritDoc}
+     * @throws IllegalArgumentException      {@inheritDoc}
+     */
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        if (maxElements <= 0)
+            return 0;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            while (n < maxElements) {
+                E first = q.peek();
+                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+                    break;
+                c.add(q.poll());
+                ++n;
+            }
+            if (n > 0)
+                available.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this delay queue.
+     * The queue will be empty after this call returns.
+     * Elements with an unexpired delay are not waited for; they are
+     * simply discarded from the queue.
+     */
+    public void clear() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            q.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Always returns <tt>Integer.MAX_VALUE</tt> because
+     * a <tt>DelayQueue</tt> is not capacity constrained.
+     *
+     * @return <tt>Integer.MAX_VALUE</tt>
+     */
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Returns an array containing all of the elements in this queue.
+     * The returned array elements are in no particular order.
+     *
+     * <p>The returned array will be "safe" in that no references to it are
+     * maintained by this queue.  (In other words, this method must allocate
+     * a new array).  The caller is thus free to modify the returned array.
+     *
+     * <p>This method acts as bridge between array-based and collection-based
+     * APIs.
+     *
+     * @return an array containing all of the elements in this queue
+     */
+    public Object[] toArray() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an array containing all of the elements in this queue; the
+     * runtime type of the returned array is that of the specified array.
+     * The returned array elements are in no particular order.
+     * If the queue fits in the specified array, it is returned therein.
+     * Otherwise, a new array is allocated with the runtime type of the
+     * specified array and the size of this queue.
+     *
+     * <p>If this queue fits in the specified array with room to spare
+     * (i.e., the array has more elements than this queue), the element in
+     * the array immediately following the end of the queue is set to
+     * <tt>null</tt>.
+     *
+     * <p>Like the {@link #toArray()} method, this method acts as bridge between
+     * array-based and collection-based APIs.  Further, this method allows
+     * precise control over the runtime type of the output array, and may,
+     * under certain circumstances, be used to save allocation costs.
+     *
+     * <p>The following code can be used to dump a delay queue into a newly
+     * allocated array of <tt>Delayed</tt>:
+     *
+     * <pre>
+     *     Delayed[] a = q.toArray(new Delayed[0]);</pre>
+     *
+     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
+     * <tt>toArray()</tt>.
+     *
+     * @param a the array into which the elements of the queue are to
+     *          be stored, if it is big enough; otherwise, a new array of the
+     *          same runtime type is allocated for this purpose
+     * @return an array containing all of the elements in this queue
+     * @throws ArrayStoreException if the runtime type of the specified array
+     *         is not a supertype of the runtime type of every element in
+     *         this queue
+     * @throws NullPointerException if the specified array is null
+     */
+    public <T> T[] toArray(T[] a) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray(a);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Removes a single instance of the specified element from this
+     * queue, if it is present, whether or not it has expired.
+     */
+    public boolean remove(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.remove(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over all the elements (both expired and
+     * unexpired) in this queue. The iterator does not return the
+     * elements in any particular order.  The returned
+     * <tt>Iterator</tt> is a "weakly consistent" iterator that will
+     * never throw {@link ConcurrentModificationException}, and
+     * guarantees to traverse elements as they existed upon
+     * construction of the iterator, and may (but is not guaranteed
+     * to) reflect any modifications subsequent to construction.
+     *
+     * @return an iterator over the elements in this queue
+     */
+    public Iterator<E> iterator() {
+        return new Itr(toArray());
+    }
+
+    /**
+     * Snapshot iterator that works off copy of underlying q array.
+     */
+    private class Itr implements Iterator<E> {
+        final Object[] array; // Array of all elements
+        int cursor;           // index of next element to return;
+        int lastRet;          // index of last element, or -1 if no such
+
+        Itr(Object[] array) {
+            lastRet = -1;
+            this.array = array;
+        }
+
+        public boolean hasNext() {
+            return cursor < array.length;
+        }
+
+        public E next() {
+            if (cursor >= array.length)
+                throw new NoSuchElementException();
+            lastRet = cursor;
+            return (E)array[cursor++];
+        }
+
+        public void remove() {
+            if (lastRet < 0)
+                throw new IllegalStateException();
+            Object x = array[lastRet];
+            lastRet = -1;
+            // Traverse underlying queue to find == element,
+            // not just a .equals element.
+            lock.lock();
+            try {
+                for (Iterator it = q.iterator(); it.hasNext(); ) {
+                    if (it.next() == x) {
+                        it.remove();
+                        return;
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+}