--- a/jdk/src/share/classes/java/util/concurrent/DelayQueue.java Mon Mar 10 23:23:47 2008 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/DelayQueue.java Mon Mar 10 23:23:47 2008 -0700
@@ -69,10 +69,34 @@
implements BlockingQueue<E> {
private transient final ReentrantLock lock = new ReentrantLock();
- private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
+ * Thread designated to wait for the element at the head of
+ * the queue. This variant of the Leader-Follower pattern
+ * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+ * minimize unnecessary timed waiting. When a thread becomes
+ * the leader, it waits only for the next delay to elapse, but
+ * other threads await indefinitely. The leader thread must
+ * signal some other thread before returning from take() or
+ * poll(...), unless some other thread becomes leader in the
+ * interim. Whenever the head of the queue is replaced with
+ * an element with an earlier expiration time, the leader
+ * field is invalidated by being reset to null, and some
+ * waiting thread, but not necessarily the current leader, is
+ * signalled. So waiting threads must be prepared to acquire
+ * and lose leadership while waiting.
+ */
+ private Thread leader = null;
+
+ /**
+ * Condition signalled when a newer element becomes available
+ * at the head of the queue or a new thread may need to
+ * become leader.
+ */
+ private final Condition available = lock.newCondition();
+
+ /**
* Creates a new <tt>DelayQueue</tt> that is initially empty.
*/
public DelayQueue() {}
@@ -111,10 +135,11 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- E first = q.peek();
q.offer(e);
- if (first == null || e.compareTo(first) < 0)
- available.signalAll();
+ if (q.peek() == e) {
+ leader = null;
+ available.signal();
+ }
return true;
} finally {
lock.unlock();
@@ -160,13 +185,8 @@
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
- else {
- E x = q.poll();
- assert x != null;
- if (q.size() != 0)
- available.signalAll();
- return x;
- }
+ else
+ return q.poll();
} finally {
lock.unlock();
}
@@ -185,23 +205,29 @@
try {
for (;;) {
E first = q.peek();
- if (first == null) {
+ if (first == null)
available.await();
- } else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
- if (delay > 0) {
- long tl = available.awaitNanos(delay);
- } else {
- E x = q.poll();
- assert x != null;
- if (q.size() != 0)
- available.signalAll(); // wake up other takers
- return x;
-
+ else {
+ long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ if (delay <= 0)
+ return q.poll();
+ else if (leader != null)
+ available.await();
+ else {
+ Thread thisThread = Thread.currentThread();
+ leader = thisThread;
+ try {
+ available.awaitNanos(delay);
+ } finally {
+ if (leader == thisThread)
+ leader = null;
+ }
}
}
}
} finally {
+ if (leader == null && q.peek() != null)
+ available.signal();
lock.unlock();
}
}
@@ -230,23 +256,28 @@
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
- if (delay > 0) {
- if (nanos <= 0)
- return null;
- if (delay > nanos)
- delay = nanos;
- long timeLeft = available.awaitNanos(delay);
- nanos -= delay - timeLeft;
- } else {
- E x = q.poll();
- assert x != null;
- if (q.size() != 0)
- available.signalAll();
- return x;
+ if (delay <= 0)
+ return q.poll();
+ if (nanos <= 0)
+ return null;
+ if (nanos < delay || leader != null)
+ nanos = available.awaitNanos(nanos);
+ else {
+ Thread thisThread = Thread.currentThread();
+ leader = thisThread;
+ try {
+ long timeLeft = available.awaitNanos(delay);
+ nanos -= delay - timeLeft;
+ } finally {
+ if (leader == thisThread)
+ leader = null;
+ }
}
}
}
} finally {
+ if (leader == null && q.peek() != null)
+ available.signal();
lock.unlock();
}
}
@@ -303,8 +334,6 @@
c.add(q.poll());
++n;
}
- if (n > 0)
- available.signalAll();
return n;
} finally {
lock.unlock();
@@ -335,8 +364,6 @@
c.add(q.poll());
++n;
}
- if (n > 0)
- available.signalAll();
return n;
} finally {
lock.unlock();
@@ -485,6 +512,7 @@
return cursor < array.length;
}
+ @SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();