jdk/src/share/classes/java/util/concurrent/DelayQueue.java
changeset 60 429af02854d2
parent 2 90ce3da70b43
child 5506 202f599c92aa
--- a/jdk/src/share/classes/java/util/concurrent/DelayQueue.java	Mon Mar 10 23:23:47 2008 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/DelayQueue.java	Mon Mar 10 23:23:47 2008 -0700
@@ -69,10 +69,34 @@
     implements BlockingQueue<E> {
 
     private transient final ReentrantLock lock = new ReentrantLock();
-    private transient final Condition available = lock.newCondition();
     private final PriorityQueue<E> q = new PriorityQueue<E>();
 
     /**
+     * Thread designated to wait for the element at the head of
+     * the queue.  This variant of the Leader-Follower pattern
+     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+     * minimize unnecessary timed waiting.  When a thread becomes
+     * the leader, it waits only for the next delay to elapse, but
+     * other threads await indefinitely.  The leader thread must
+     * signal some other thread before returning from take() or
+     * poll(...), unless some other thread becomes leader in the
+     * interim.  Whenever the head of the queue is replaced with
+     * an element with an earlier expiration time, the leader
+     * field is invalidated by being reset to null, and some
+     * waiting thread, but not necessarily the current leader, is
+     * signalled.  So waiting threads must be prepared to acquire
+     * and lose leadership while waiting.
+     */
+    private Thread leader = null;
+
+    /**
+     * Condition signalled when a newer element becomes available
+     * at the head of the queue or a new thread may need to
+     * become leader.
+     */
+    private final Condition available = lock.newCondition();
+
+    /**
      * Creates a new <tt>DelayQueue</tt> that is initially empty.
      */
     public DelayQueue() {}
@@ -111,10 +135,11 @@
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            E first = q.peek();
             q.offer(e);
-            if (first == null || e.compareTo(first) < 0)
-                available.signalAll();
+            if (q.peek() == e) {
+                leader = null;
+                available.signal();
+            }
             return true;
         } finally {
             lock.unlock();
@@ -160,13 +185,8 @@
             E first = q.peek();
             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                 return null;
-            else {
-                E x = q.poll();
-                assert x != null;
-                if (q.size() != 0)
-                    available.signalAll();
-                return x;
-            }
+            else
+                return q.poll();
         } finally {
             lock.unlock();
         }
@@ -185,23 +205,29 @@
         try {
             for (;;) {
                 E first = q.peek();
-                if (first == null) {
+                if (first == null)
                     available.await();
-                } else {
-                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        long tl = available.awaitNanos(delay);
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll(); // wake up other takers
-                        return x;
-
+                else {
+                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay <= 0)
+                        return q.poll();
+                    else if (leader != null)
+                        available.await();
+                    else {
+                        Thread thisThread = Thread.currentThread();
+                        leader = thisThread;
+                        try {
+                            available.awaitNanos(delay);
+                        } finally {
+                            if (leader == thisThread)
+                                leader = null;
+                        }
                     }
                 }
             }
         } finally {
+            if (leader == null && q.peek() != null)
+                available.signal();
             lock.unlock();
         }
     }
@@ -230,23 +256,28 @@
                         nanos = available.awaitNanos(nanos);
                 } else {
                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        if (nanos <= 0)
-                            return null;
-                        if (delay > nanos)
-                            delay = nanos;
-                        long timeLeft = available.awaitNanos(delay);
-                        nanos -= delay - timeLeft;
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll();
-                        return x;
+                    if (delay <= 0)
+                        return q.poll();
+                    if (nanos <= 0)
+                        return null;
+                    if (nanos < delay || leader != null)
+                        nanos = available.awaitNanos(nanos);
+                    else {
+                        Thread thisThread = Thread.currentThread();
+                        leader = thisThread;
+                        try {
+                            long timeLeft = available.awaitNanos(delay);
+                            nanos -= delay - timeLeft;
+                        } finally {
+                            if (leader == thisThread)
+                                leader = null;
+                        }
                     }
                 }
             }
         } finally {
+            if (leader == null && q.peek() != null)
+                available.signal();
             lock.unlock();
         }
     }
@@ -303,8 +334,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -335,8 +364,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -485,6 +512,7 @@
             return cursor < array.length;
         }
 
+        @SuppressWarnings("unchecked")
         public E next() {
             if (cursor >= array.length)
                 throw new NoSuchElementException();