--- a/jdk/src/share/classes/java/util/concurrent/DelayQueue.java Tue Jul 02 15:58:09 2013 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/DelayQueue.java Wed Jul 03 11:58:09 2013 +0200
@@ -33,28 +33,31 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
-
package java.util.concurrent;
-import java.util.concurrent.locks.*;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
- * <tt>Delayed</tt> elements, in which an element can only be taken
+ * {@code Delayed} elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
- * <tt>Delayed</tt> element whose delay expired furthest in the
- * past. If no delay has expired there is no head and <tt>poll</tt>
- * will return <tt>null</tt>. Expiration occurs when an element's
- * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
+ * {@code Delayed} element whose delay expired furthest in the
+ * past. If no delay has expired there is no head and {@code poll}
+ * will return {@code null}. Expiration occurs when an element's
+ * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
- * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
- * treated as normal elements. For example, the <tt>size</tt> method
+ * removed using {@code take} or {@code poll}, they are otherwise
+ * treated as normal elements. For example, the {@code size} method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
- * Iterator} interfaces.
+ * Iterator} interfaces. The Iterator provided in method {@link
+ * #iterator()} is <em>not</em> guaranteed to traverse the elements of
+ * the DelayQueue in any particular order.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
@@ -64,11 +67,10 @@
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
-
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
- private transient final ReentrantLock lock = new ReentrantLock();
+ private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
@@ -97,12 +99,12 @@
private final Condition available = lock.newCondition();
/**
- * Creates a new <tt>DelayQueue</tt> that is initially empty.
+ * Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}
/**
- * Creates a <tt>DelayQueue</tt> initially containing the elements of the
+ * Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
@@ -117,7 +119,7 @@
* Inserts the specified element into this delay queue.
*
* @param e the element to add
- * @return <tt>true</tt> (as specified by {@link Collection#add})
+ * @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
@@ -128,7 +130,7 @@
* Inserts the specified element into this delay queue.
*
* @param e the element to add
- * @return <tt>true</tt>
+ * @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
@@ -164,7 +166,7 @@
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
- * @return <tt>true</tt>
+ * @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
@@ -172,10 +174,10 @@
}
/**
- * Retrieves and removes the head of this queue, or returns <tt>null</tt>
+ * Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
- * @return the head of this queue, or <tt>null</tt> if this
+ * @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
@@ -183,7 +185,7 @@
lock.lock();
try {
E first = q.peek();
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
@@ -208,10 +210,11 @@
if (first == null)
available.await();
else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
- else if (leader != null)
+ first = null; // don't retain ref while waiting
+ if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
@@ -237,7 +240,7 @@
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
- * @return the head of this queue, or <tt>null</tt> if the
+ * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
@@ -255,11 +258,12 @@
else
nanos = available.awaitNanos(nanos);
} else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
+ first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
@@ -284,13 +288,13 @@
/**
* Retrieves, but does not remove, the head of this queue, or
- * returns <tt>null</tt> if this queue is empty. Unlike
- * <tt>poll</tt>, if no expired elements are available in the queue,
+ * returns {@code null} if this queue is empty. Unlike
+ * {@code poll}, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
*
- * @return the head of this queue, or <tt>null</tt> if this
- * queue is empty.
+ * @return the head of this queue, or {@code null} if this
+ * queue is empty
*/
public E peek() {
final ReentrantLock lock = this.lock;
@@ -313,6 +317,17 @@
}
/**
+ * Returns first element only if it is expired.
+ * Used only by drainTo. Call only when holding lock.
+ */
+ private E peekExpired() {
+ // assert lock.isHeldByCurrentThread();
+ E first = q.peek();
+ return (first == null || first.getDelay(NANOSECONDS) > 0) ?
+ null : first;
+ }
+
+ /**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
@@ -327,11 +342,9 @@
lock.lock();
try {
int n = 0;
- for (;;) {
- E first = q.peek();
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
- break;
- c.add(q.poll());
+ for (E e; (e = peekExpired()) != null;) {
+ c.add(e); // In this order, in case add() throws.
+ q.poll();
++n;
}
return n;
@@ -357,11 +370,9 @@
lock.lock();
try {
int n = 0;
- while (n < maxElements) {
- E first = q.peek();
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
- break;
- c.add(q.poll());
+ for (E e; n < maxElements && (e = peekExpired()) != null;) {
+ c.add(e); // In this order, in case add() throws.
+ q.poll();
++n;
}
return n;
@@ -387,10 +398,10 @@
}
/**
- * Always returns <tt>Integer.MAX_VALUE</tt> because
- * a <tt>DelayQueue</tt> is not capacity constrained.
+ * Always returns {@code Integer.MAX_VALUE} because
+ * a {@code DelayQueue} is not capacity constrained.
*
- * @return <tt>Integer.MAX_VALUE</tt>
+ * @return {@code Integer.MAX_VALUE}
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
@@ -430,7 +441,7 @@
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
@@ -438,13 +449,12 @@
* under certain circumstances, be used to save allocation costs.
*
* <p>The following code can be used to dump a delay queue into a newly
- * allocated array of <tt>Delayed</tt>:
+ * allocated array of {@code Delayed}:
*
- * <pre>
- * Delayed[] a = q.toArray(new Delayed[0]);</pre>
+ * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -480,6 +490,24 @@
}
/**
+ * Identity-based version for use in Itr.remove
+ */
+ void removeEQ(Object o) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
+ if (o == it.next()) {
+ it.remove();
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
@@ -502,7 +530,7 @@
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
- int cursor; // index of next element to return;
+ int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
@@ -525,21 +553,8 @@
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
- Object x = array[lastRet];
+ removeEQ(array[lastRet]);
lastRet = -1;
- // Traverse underlying queue to find == element,
- // not just a .equals element.
- lock.lock();
- try {
- for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
- if (it.next() == x) {
- it.remove();
- return;
- }
- }
- } finally {
- lock.unlock();
- }
}
}