# HG changeset patch # User martin # Date 1205216627 25200 # Node ID f35c86a7e82dcb854c8fe1f594059c2e678f374a # Parent 55e9cd9ade0beec1406639f505c81f9f95e0f7ce 6602600: Fast removal of cancelled scheduled thread pool tasks Reviewed-by: alanb Contributed-by: Doug Lea diff -r 55e9cd9ade0b -r f35c86a7e82d jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java --- a/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Mon Mar 10 23:23:47 2008 -0700 +++ b/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Mon Mar 10 23:23:47 2008 -0700 @@ -35,6 +35,7 @@ package java.util.concurrent; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import java.util.*; /** @@ -45,12 +46,21 @@ * flexibility or capabilities of {@link ThreadPoolExecutor} (which * this class extends) are required. * - *

Delayed tasks execute no sooner than they are enabled, but + *

Delayed tasks execute no sooner than they are enabled, but * without any real-time guarantees about when, after they are * enabled, they will commence. Tasks scheduled for exactly the same * execution time are enabled in first-in-first-out (FIFO) order of * submission. * + *

When a submitted task is cancelled before it is run, execution + * is suppressed. By default, such a cancelled task is not + * automatically removed from the work queue until its delay + * elapses. While this enables further inspection and monitoring, it + * may also cause unbounded retention of cancelled tasks. To avoid + * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which + * causes tasks to be immediately removed from the work queue at + * time of cancellation. + * *

While this class inherits from {@link ThreadPoolExecutor}, a few * of the inherited tuning methods are not useful for it. In * particular, because it acts as a fixed-sized pool using @@ -111,21 +121,11 @@ * ScheduledExecutorService methods) which are treated as * delayed tasks with a delay of zero. * - * 2. Using a custom queue (DelayedWorkQueue) based on an + * 2. Using a custom queue (DelayedWorkQueue), a variant of * unbounded DelayQueue. The lack of capacity constraint and * the fact that corePoolSize and maximumPoolSize are * effectively identical simplifies some execution mechanics - * (see delayedExecute) compared to ThreadPoolExecutor - * version. - * - * The DelayedWorkQueue class is defined below for the sake of - * ensuring that all elements are instances of - * RunnableScheduledFuture. Since DelayQueue otherwise - * requires type be Delayed, but not necessarily Runnable, and - * the workQueue requires the opposite, we need to explicitly - * define a class that requires both to ensure that users don't - * add objects that aren't RunnableScheduledFutures via - * getQueue().add() etc. + * (see delayedExecute) compared to ThreadPoolExecutor. * * 3. Supporting optional run-after-shutdown parameters, which * leads to overrides of shutdown methods to remove and cancel @@ -150,6 +150,11 @@ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** + * True if ScheduledFutureTask.cancel should remove from queue + */ + private volatile boolean removeOnCancel = false; + + /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ @@ -167,8 +172,10 @@ /** Sequence number to break ties FIFO */ private final long sequenceNumber; + /** The time the task is enabled to execute in nanoTime units */ private long time; + /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value @@ -181,6 +188,11 @@ RunnableScheduledFuture outerTask = this; /** + * Index into delay queue, to support faster cancellation. + */ + int heapIndex; + + /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { @@ -255,6 +267,13 @@ time = now() - p; } + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && removeOnCancel && heapIndex >= 0) + remove(this); + return cancelled; + } + /** * Overrides FutureTask version so as to reset/requeue if periodic. */ @@ -655,6 +674,33 @@ } /** + * Sets the policy on whether cancelled tasks should be immediately + * removed from the work queue at time of cancellation. This value is + * by default {@code false}. + * + * @param value if {@code true}, remove on cancellation, else don't + * @see #getRemoveOnCancelPolicy + * @since 1.7 + */ + public void setRemoveOnCancelPolicy(boolean value) { + removeOnCancel = value; + } + + /** + * Gets the policy on whether cancelled tasks should be immediately + * removed from the work queue at time of cancellation. This value is + * by default {@code false}. + * + * @return {@code true} if cancelled tasks are immediately removed + * from the queue + * @see #setRemoveOnCancelPolicy + * @since 1.7 + */ + public boolean getRemoveOnCancelPolicy() { + return removeOnCancel; + } + + /** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. If the * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has @@ -707,56 +753,478 @@ } /** - * An annoying wrapper class to convince javac to use a - * DelayQueue as a BlockingQueue + * Specialized delay queue. To mesh with TPE declarations, this + * class must be declared as a BlockingQueue even though + * it can only hold RunnableScheduledFutures. */ - private static class DelayedWorkQueue - extends AbstractCollection + static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue { - private final DelayQueue dq = new DelayQueue(); - public Runnable poll() { return dq.poll(); } - public Runnable peek() { return dq.peek(); } - public Runnable take() throws InterruptedException { return dq.take(); } - public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { - return dq.poll(timeout, unit); + /* + * A DelayedWorkQueue is based on a heap-based data structure + * like those in DelayQueue and PriorityQueue, except that + * every ScheduledFutureTask also records its index into the + * heap array. This eliminates the need to find a task upon + * cancellation, greatly speeding up removal (down from O(n) + * to O(log n)), and reducing garbage retention that would + * otherwise occur by waiting for the element to rise to top + * before clearing. But because the queue may also hold + * RunnableScheduledFutures that are not ScheduledFutureTasks, + * we are not guaranteed to have such indices available, in + * which case we fall back to linear search. (We expect that + * most tasks will not be decorated, and that the faster cases + * will be much more common.) + * + * All heap operations must record index changes -- mainly + * within siftUp and siftDown. Upon removal, a task's + * heapIndex is set to -1. Note that ScheduledFutureTasks can + * appear at most once in the queue (this need not be true for + * other kinds of tasks or work queues), so are uniquely + * identified by heapIndex. + */ + + private static final int INITIAL_CAPACITY = 16; + private RunnableScheduledFuture[] queue = + new RunnableScheduledFuture[INITIAL_CAPACITY]; + private final ReentrantLock lock = new ReentrantLock(); + private int size = 0; + + /** + * Thread designated to wait for the task at the head of the + * queue. This variant of the Leader-Follower pattern + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to + * minimize unnecessary timed waiting. When a thread becomes + * the leader, it waits only for the next delay to elapse, but + * other threads await indefinitely. The leader thread must + * signal some other thread before returning from take() or + * poll(...), unless some other thread becomes leader in the + * interim. Whenever the head of the queue is replaced with a + * task with an earlier expiration time, the leader field is + * invalidated by being reset to null, and some waiting + * thread, but not necessarily the current leader, is + * signalled. So waiting threads must be prepared to acquire + * and lose leadership while waiting. + */ + private Thread leader = null; + + /** + * Condition signalled when a newer task becomes available at the + * head of the queue or a new thread may need to become leader. + */ + private final Condition available = lock.newCondition(); + + /** + * Set f's heapIndex if it is a ScheduledFutureTask. + */ + private void setIndex(RunnableScheduledFuture f, int idx) { + if (f instanceof ScheduledFutureTask) + ((ScheduledFutureTask)f).heapIndex = idx; + } + + /** + * Sift element added at bottom up to its heap-ordered spot. + * Call only when holding lock. + */ + private void siftUp(int k, RunnableScheduledFuture key) { + while (k > 0) { + int parent = (k - 1) >>> 1; + RunnableScheduledFuture e = queue[parent]; + if (key.compareTo(e) >= 0) + break; + queue[k] = e; + setIndex(e, k); + k = parent; + } + queue[k] = key; + setIndex(key, k); + } + + /** + * Sift element added at top down to its heap-ordered spot. + * Call only when holding lock. + */ + private void siftDown(int k, RunnableScheduledFuture key) { + int half = size >>> 1; + while (k < half) { + int child = (k << 1) + 1; + RunnableScheduledFuture c = queue[child]; + int right = child + 1; + if (right < size && c.compareTo(queue[right]) > 0) + c = queue[child = right]; + if (key.compareTo(c) <= 0) + break; + queue[k] = c; + setIndex(c, k); + k = child; + } + queue[k] = key; + setIndex(key, k); + } + + /** + * Resize the heap array. Call only when holding lock. + */ + private void grow() { + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% + if (newCapacity < 0) // overflow + newCapacity = Integer.MAX_VALUE; + queue = Arrays.copyOf(queue, newCapacity); + } + + /** + * Find index of given object, or -1 if absent + */ + private int indexOf(Object x) { + if (x != null) { + if (x instanceof ScheduledFutureTask) { + int i = ((ScheduledFutureTask) x).heapIndex; + // Sanity check; x could conceivably be a + // ScheduledFutureTask from some other pool. + if (i >= 0 && i < size && queue[i] == x) + return i; + } else { + for (int i = 0; i < size; i++) + if (x.equals(queue[i])) + return i; + } + } + return -1; + } + + public boolean contains(Object x) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return indexOf(x) != -1; + } finally { + lock.unlock(); + } + } + + public boolean remove(Object x) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int i = indexOf(x); + if (i < 0) + return false; + + setIndex(queue[i], -1); + int s = --size; + RunnableScheduledFuture replacement = queue[s]; + queue[s] = null; + if (s != i) { + siftDown(i, replacement); + if (queue[i] == replacement) + siftUp(i, replacement); + } + return true; + } finally { + lock.unlock(); + } + } + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return size; + } finally { + lock.unlock(); + } + } + + public boolean isEmpty() { + return size() == 0; + } + + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public RunnableScheduledFuture peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return queue[0]; + } finally { + lock.unlock(); + } + } + + public boolean offer(Runnable x) { + if (x == null) + throw new NullPointerException(); + RunnableScheduledFuture e = (RunnableScheduledFuture)x; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int i = size; + if (i >= queue.length) + grow(); + size = i + 1; + if (i == 0) { + queue[0] = e; + setIndex(e, 0); + } else { + siftUp(i, e); + } + if (queue[0] == e) { + leader = null; + available.signal(); + } + } finally { + lock.unlock(); + } + return true; + } + + public void put(Runnable e) { + offer(e); + } + + public boolean add(Runnable e) { + return offer(e); } - public boolean add(Runnable x) { - return dq.add((RunnableScheduledFuture)x); + public boolean offer(Runnable e, long timeout, TimeUnit unit) { + return offer(e); } - public boolean offer(Runnable x) { - return dq.offer((RunnableScheduledFuture)x); + + /** + * Performs common bookkeeping for poll and take: Replaces + * first element with last and sifts it down. Call only when + * holding lock. + * @param f the task to remove and return + */ + private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { + int s = --size; + RunnableScheduledFuture x = queue[s]; + queue[s] = null; + if (s != 0) + siftDown(0, x); + setIndex(f, -1); + return f; + } + + public RunnableScheduledFuture poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + RunnableScheduledFuture first = queue[0]; + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + return null; + else + return finishPoll(first); + } finally { + lock.unlock(); + } } - public void put(Runnable x) { - dq.put((RunnableScheduledFuture)x); + + public RunnableScheduledFuture take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + RunnableScheduledFuture first = queue[0]; + if (first == null) + available.await(); + else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay <= 0) + return finishPoll(first); + else if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && queue[0] != null) + available.signal(); + lock.unlock(); + } } - public boolean offer(Runnable x, long timeout, TimeUnit unit) { - return dq.offer((RunnableScheduledFuture)x, timeout, unit); + + public RunnableScheduledFuture poll(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + RunnableScheduledFuture first = queue[0]; + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay <= 0) + return finishPoll(first); + if (nanos <= 0) + return null; + if (nanos < delay || leader != null) + nanos = available.awaitNanos(nanos); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && queue[0] != null) + available.signal(); + lock.unlock(); + } } - public Runnable remove() { return dq.remove(); } - public Runnable element() { return dq.element(); } - public void clear() { dq.clear(); } - public int drainTo(Collection c) { return dq.drainTo(c); } - public int drainTo(Collection c, int maxElements) { - return dq.drainTo(c, maxElements); + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + for (int i = 0; i < size; i++) { + RunnableScheduledFuture t = queue[i]; + if (t != null) { + queue[i] = null; + setIndex(t, -1); + } + } + size = 0; + } finally { + lock.unlock(); + } + } + + /** + * Return and remove first element only if it is expired. + * Used only by drainTo. Call only when holding lock. + */ + private RunnableScheduledFuture pollExpired() { + RunnableScheduledFuture first = queue[0]; + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + return null; + return finishPoll(first); + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + RunnableScheduledFuture first; + int n = 0; + while ((first = pollExpired()) != null) { + c.add(first); + ++n; + } + return n; + } finally { + lock.unlock(); + } } - public int remainingCapacity() { return dq.remainingCapacity(); } - public boolean remove(Object x) { return dq.remove(x); } - public boolean contains(Object x) { return dq.contains(x); } - public int size() { return dq.size(); } - public boolean isEmpty() { return dq.isEmpty(); } - public Object[] toArray() { return dq.toArray(); } - public T[] toArray(T[] array) { return dq.toArray(array); } + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + RunnableScheduledFuture first; + int n = 0; + while (n < maxElements && (first = pollExpired()) != null) { + c.add(first); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + public Object[] toArray() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return Arrays.copyOf(queue, size, Object[].class); + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (a.length < size) + return (T[]) Arrays.copyOf(queue, size, a.getClass()); + System.arraycopy(queue, 0, a, 0, size); + if (a.length > size) + a[size] = null; + return a; + } finally { + lock.unlock(); + } + } + public Iterator iterator() { - return new Iterator() { - private Iterator it = dq.iterator(); - public boolean hasNext() { return it.hasNext(); } - public Runnable next() { return it.next(); } - public void remove() { it.remove(); } - }; + return new Itr(Arrays.copyOf(queue, size)); + } + + /** + * Snapshot iterator that works off copy of underlying q array. + */ + private class Itr implements Iterator { + final RunnableScheduledFuture[] array; + int cursor = 0; // index of next element to return + int lastRet = -1; // index of last element, or -1 if no such + + Itr(RunnableScheduledFuture[] array) { + this.array = array; + } + + public boolean hasNext() { + return cursor < array.length; + } + + public Runnable next() { + if (cursor >= array.length) + throw new NoSuchElementException(); + lastRet = cursor; + return array[cursor++]; + } + + public void remove() { + if (lastRet < 0) + throw new IllegalStateException(); + DelayedWorkQueue.this.remove(array[lastRet]); + lastRet = -1; + } } } } diff -r 55e9cd9ade0b -r f35c86a7e82d jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/BasicCancelTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/BasicCancelTest.java Mon Mar 10 23:23:47 2008 -0700 @@ -0,0 +1,113 @@ +/* + * Copyright 2008 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * @test + * @bug 6602600 + * @run main/othervm -Xmx8m BasicCancelTest + * @summary Check effectiveness of RemoveOnCancelPolicy + */ + +import java.util.concurrent.*; +import java.util.Random; + +/** + * Simple timer cancellation test. Submits tasks to a scheduled executor + * service and immediately cancels them. + */ +public class BasicCancelTest { + + void checkShutdown(final ExecutorService es) { + final Runnable nop = new Runnable() {public void run() {}}; + try { + if (new Random().nextBoolean()) { + check(es.isShutdown()); + if (es instanceof ThreadPoolExecutor) + check(((ThreadPoolExecutor) es).isTerminating() + || es.isTerminated()); + THROWS(RejectedExecutionException.class, + new F(){void f(){es.execute(nop);}}); + } + } catch (Throwable t) { unexpected(t); } + } + + void checkTerminated(final ThreadPoolExecutor tpe) { + try { + checkShutdown(tpe); + check(tpe.getQueue().isEmpty()); + check(tpe.isTerminated()); + check(! tpe.isTerminating()); + equal(tpe.getActiveCount(), 0); + equal(tpe.getPoolSize(), 0); + equal(tpe.getTaskCount(), tpe.getCompletedTaskCount()); + check(tpe.awaitTermination(0, TimeUnit.SECONDS)); + } catch (Throwable t) { unexpected(t); } + } + + void test(String[] args) throws Throwable { + + final ScheduledThreadPoolExecutor pool = + new ScheduledThreadPoolExecutor(1); + + // Needed to avoid OOME + pool.setRemoveOnCancelPolicy(true); + + final long moreThanYouCanChew = Runtime.getRuntime().freeMemory() / 4; + System.out.printf("moreThanYouCanChew=%d%n", moreThanYouCanChew); + + Runnable noopTask = new Runnable() { public void run() {}}; + + for (long i = 0; i < moreThanYouCanChew; i++) + pool.schedule(noopTask, 10, TimeUnit.MINUTES).cancel(true); + + pool.shutdown(); + check(pool.awaitTermination(1L, TimeUnit.DAYS)); + checkTerminated(pool); + equal(pool.getTaskCount(), 0L); + equal(pool.getCompletedTaskCount(), 0L); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new BasicCancelTest().instanceMain(args);} + void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} + abstract class F {abstract void f() throws Throwable;} + void THROWS(Class k, F... fs) { + for (F f : fs) + try {f.f(); fail("Expected " + k.getName() + " not thrown");} + catch (Throwable t) { + if (k.isAssignableFrom(t.getClass())) pass(); + else unexpected(t);}} +} diff -r 55e9cd9ade0b -r f35c86a7e82d jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/Stress.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/Stress.java Mon Mar 10 23:23:47 2008 -0700 @@ -0,0 +1,54 @@ +/* + * Copyright 2008 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +import java.util.concurrent.*; + +/** + * This is not a regression test, but a stress benchmark test for + * 6602600: Fast removal of cancelled scheduled thread pool tasks + * + * This runs in the same wall clock time, but much reduced cpu time, + * with the changes for 6602600. + */ +public class Stress { + + public static void main(String[] args) throws Throwable { + + final CountDownLatch count = new CountDownLatch(1000); + + final ScheduledThreadPoolExecutor pool = + new ScheduledThreadPoolExecutor(100); + pool.prestartAllCoreThreads(); + + final Runnable incTask = new Runnable() { public void run() { + count.countDown(); + }}; + + pool.scheduleAtFixedRate(incTask, 0, 10, TimeUnit.MILLISECONDS); + + count.await(); + + pool.shutdown(); + pool.awaitTermination(1L, TimeUnit.DAYS); + } +}