6609775: Reduce context switches in DelayQueue due to signalAll
authormartin
Mon, 10 Mar 2008 23:23:47 -0700
changeset 60 429af02854d2
parent 59 f35c86a7e82d
child 61 5691b03db1ea
6609775: Reduce context switches in DelayQueue due to signalAll Reviewed-by: alanb Contributed-by: Doug Lea <dl@cs.oswego.edu>
jdk/src/share/classes/java/util/concurrent/DelayQueue.java
jdk/test/java/util/concurrent/DelayQueue/Stress.java
--- a/jdk/src/share/classes/java/util/concurrent/DelayQueue.java	Mon Mar 10 23:23:47 2008 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/DelayQueue.java	Mon Mar 10 23:23:47 2008 -0700
@@ -69,10 +69,34 @@
     implements BlockingQueue<E> {
 
     private transient final ReentrantLock lock = new ReentrantLock();
-    private transient final Condition available = lock.newCondition();
     private final PriorityQueue<E> q = new PriorityQueue<E>();
 
     /**
+     * Thread designated to wait for the element at the head of
+     * the queue.  This variant of the Leader-Follower pattern
+     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+     * minimize unnecessary timed waiting.  When a thread becomes
+     * the leader, it waits only for the next delay to elapse, but
+     * other threads await indefinitely.  The leader thread must
+     * signal some other thread before returning from take() or
+     * poll(...), unless some other thread becomes leader in the
+     * interim.  Whenever the head of the queue is replaced with
+     * an element with an earlier expiration time, the leader
+     * field is invalidated by being reset to null, and some
+     * waiting thread, but not necessarily the current leader, is
+     * signalled.  So waiting threads must be prepared to acquire
+     * and lose leadership while waiting.
+     */
+    private Thread leader = null;
+
+    /**
+     * Condition signalled when a newer element becomes available
+     * at the head of the queue or a new thread may need to
+     * become leader.
+     */
+    private final Condition available = lock.newCondition();
+
+    /**
      * Creates a new <tt>DelayQueue</tt> that is initially empty.
      */
     public DelayQueue() {}
@@ -111,10 +135,11 @@
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            E first = q.peek();
             q.offer(e);
-            if (first == null || e.compareTo(first) < 0)
-                available.signalAll();
+            if (q.peek() == e) {
+                leader = null;
+                available.signal();
+            }
             return true;
         } finally {
             lock.unlock();
@@ -160,13 +185,8 @@
             E first = q.peek();
             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                 return null;
-            else {
-                E x = q.poll();
-                assert x != null;
-                if (q.size() != 0)
-                    available.signalAll();
-                return x;
-            }
+            else
+                return q.poll();
         } finally {
             lock.unlock();
         }
@@ -185,23 +205,29 @@
         try {
             for (;;) {
                 E first = q.peek();
-                if (first == null) {
+                if (first == null)
                     available.await();
-                } else {
-                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        long tl = available.awaitNanos(delay);
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll(); // wake up other takers
-                        return x;
-
+                else {
+                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
+                    if (delay <= 0)
+                        return q.poll();
+                    else if (leader != null)
+                        available.await();
+                    else {
+                        Thread thisThread = Thread.currentThread();
+                        leader = thisThread;
+                        try {
+                            available.awaitNanos(delay);
+                        } finally {
+                            if (leader == thisThread)
+                                leader = null;
+                        }
                     }
                 }
             }
         } finally {
+            if (leader == null && q.peek() != null)
+                available.signal();
             lock.unlock();
         }
     }
@@ -230,23 +256,28 @@
                         nanos = available.awaitNanos(nanos);
                 } else {
                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        if (nanos <= 0)
-                            return null;
-                        if (delay > nanos)
-                            delay = nanos;
-                        long timeLeft = available.awaitNanos(delay);
-                        nanos -= delay - timeLeft;
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll();
-                        return x;
+                    if (delay <= 0)
+                        return q.poll();
+                    if (nanos <= 0)
+                        return null;
+                    if (nanos < delay || leader != null)
+                        nanos = available.awaitNanos(nanos);
+                    else {
+                        Thread thisThread = Thread.currentThread();
+                        leader = thisThread;
+                        try {
+                            long timeLeft = available.awaitNanos(delay);
+                            nanos -= delay - timeLeft;
+                        } finally {
+                            if (leader == thisThread)
+                                leader = null;
+                        }
                     }
                 }
             }
         } finally {
+            if (leader == null && q.peek() != null)
+                available.signal();
             lock.unlock();
         }
     }
@@ -303,8 +334,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -335,8 +364,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -485,6 +512,7 @@
             return cursor < array.length;
         }
 
+        @SuppressWarnings("unchecked")
         public E next() {
             if (cursor >= array.length)
                 throw new NoSuchElementException();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/DelayQueue/Stress.java	Mon Mar 10 23:23:47 2008 -0700
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2008 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+import java.util.concurrent.*;
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * This is not a regression test, but a stress benchmark test for
+ * 6609775: Reduce context switches in DelayQueue due to signalAll
+ *
+ * This runs in the same wall clock time, but much reduced cpu time,
+ * with the changes for 6609775.
+ */
+public class Stress {
+
+    public static void main(String[] args) throws Throwable {
+
+        final DelayQueue<Delayed> q = new DelayQueue<Delayed>();
+        final long t0 = System.nanoTime();
+        for (long i = 0; i < 1000; i++) {
+            final long expiry = t0 + i*10L*1000L*1000L;
+            q.add(new Delayed() {
+                    public long getDelay(TimeUnit unit) {
+                        return unit.convert(expiry - System.nanoTime(),
+                                            NANOSECONDS);
+                    }
+                    public int compareTo(Delayed x) {
+                        long d = getDelay(NANOSECONDS)
+                            - x.getDelay(NANOSECONDS);
+                        return d < 0 ? -1 : d > 0 ? 1 : 0; }});
+        }
+
+        for (int i = 0; i < 300; i++)
+            new Thread() { public void run() {
+                try {
+                    while (!q.isEmpty())
+                        q.poll(10L, TimeUnit.SECONDS);
+                } catch (Throwable t) { t.printStackTrace(); }
+            }}.start();
+    }
+}