diff -r fd16c54261b3 -r 90ce3da70b43 jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java Sat Dec 01 00:00:00 2007 +0000 @@ -0,0 +1,2007 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; +import java.util.*; + +/** + * An {@link ExecutorService} that executes each submitted task using + * one of possibly several pooled threads, normally configured + * using {@link Executors} factory methods. + * + *
Thread pools address two different problems: they usually + * provide improved performance when executing large numbers of + * asynchronous tasks, due to reduced per-task invocation overhead, + * and they provide a means of bounding and managing the resources, + * including threads, consumed when executing a collection of tasks. + * Each {@code ThreadPoolExecutor} also maintains some basic + * statistics, such as the number of completed tasks. + * + *
To be useful across a wide range of contexts, this class + * provides many adjustable parameters and extensibility + * hooks. However, programmers are urged to use the more convenient + * {@link Executors} factory methods {@link + * Executors#newCachedThreadPool} (unbounded thread pool, with + * automatic thread reclamation), {@link Executors#newFixedThreadPool} + * (fixed size thread pool) and {@link + * Executors#newSingleThreadExecutor} (single background thread), that + * preconfigure settings for the most common usage + * scenarios. Otherwise, use the following guide when manually + * configuring and tuning this class: + * + *
If hook or callback methods throw exceptions, internal worker + * threads may in turn fail and abruptly terminate.
Extension example. Most extensions of this class + * override one or more of the protected hook methods. For example, + * here is a subclass that adds a simple pause/resume feature: + * + *
{@code + * class PausableThreadPoolExecutor extends ThreadPoolExecutor { + * private boolean isPaused; + * private ReentrantLock pauseLock = new ReentrantLock(); + * private Condition unpaused = pauseLock.newCondition(); + * + * public PausableThreadPoolExecutor(...) { super(...); } + * + * protected void beforeExecute(Thread t, Runnable r) { + * super.beforeExecute(t, r); + * pauseLock.lock(); + * try { + * while (isPaused) unpaused.await(); + * } catch (InterruptedException ie) { + * t.interrupt(); + * } finally { + * pauseLock.unlock(); + * } + * } + * + * public void pause() { + * pauseLock.lock(); + * try { + * isPaused = true; + * } finally { + * pauseLock.unlock(); + * } + * } + * + * public void resume() { + * pauseLock.lock(); + * try { + * isPaused = false; + * unpaused.signalAll(); + * } finally { + * pauseLock.unlock(); + * } + * } + * }}+ * + * @since 1.5 + * @author Doug Lea + */ +public class ThreadPoolExecutor extends AbstractExecutorService { + /** + * The main pool control state, ctl, is an atomic integer packing + * two conceptual fields + * workerCount, indicating the effective number of threads + * runState, indicating whether running, shutting down etc + * + * In order to pack them into one int, we limit workerCount to + * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 + * billion) otherwise representable. If this is ever an issue in + * the future, the variable can be changed to be an AtomicLong, + * and the shift/mask constants below adjusted. But until the need + * arises, this code is a bit faster and simpler using an int. + * + * The workerCount is the number of workers that have been + * permitted to start and not permitted to stop. The value may be + * transiently different from the actual number of live threads, + * for example when a ThreadFactory fails to create a thread when + * asked, and when exiting threads are still performing + * bookkeeping before terminating. The user-visible pool size is + * reported as the current size of the workers set. + * + * The runState provides the main lifecyle control, taking on values: + * + * RUNNING: Accept new tasks and process queued tasks + * SHUTDOWN: Don't accept new tasks, but process queued tasks + * STOP: Don't accept new tasks, don't process queued tasks, + * and interrupt in-progress tasks + * TIDYING: All tasks have terminated, workerCount is zero, + * the thread transitioning to state TIDYING + * will run the terminated() hook method + * TERMINATED: terminated() has completed + * + * The numerical order among these values matters, to allow + * ordered comparisons. The runState monotonically increases over + * time, but need not hit each state. The transitions are: + * + * RUNNING -> SHUTDOWN + * On invocation of shutdown(), perhaps implicitly in finalize() + * (RUNNING or SHUTDOWN) -> STOP + * On invocation of shutdownNow() + * SHUTDOWN -> TIDYING + * When both queue and pool are empty + * STOP -> TIDYING + * When pool is empty + * TIDYING -> TERMINATED + * When the terminated() hook method has completed + * + * Threads waiting in awaitTermination() will return when the + * state reaches TERMINATED. + * + * Detecting the transition from SHUTDOWN to TIDYING is less + * straightforward than you'd like because the queue may become + * empty after non-empty and vice versa during SHUTDOWN state, but + * we can only terminate if, after seeing that it is empty, we see + * that workerCount is 0 (which sometimes entails a recheck -- see + * below). + */ + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); + private static final int COUNT_BITS = Integer.SIZE - 3; + private static final int CAPACITY = (1 << COUNT_BITS) - 1; + + // runState is stored in the high-order bits + private static final int RUNNING = -1 << COUNT_BITS; + private static final int SHUTDOWN = 0 << COUNT_BITS; + private static final int STOP = 1 << COUNT_BITS; + private static final int TIDYING = 2 << COUNT_BITS; + private static final int TERMINATED = 3 << COUNT_BITS; + + // Packing and unpacking ctl + private static int runStateOf(int c) { return c & ~CAPACITY; } + private static int workerCountOf(int c) { return c & CAPACITY; } + private static int ctlOf(int rs, int wc) { return rs | wc; } + + /* + * Bit field accessors that don't require unpacking ctl. + * These depend on the bit layout and on workerCount being never negative. + */ + + private static boolean runStateLessThan(int c, int s) { + return c < s; + } + + private static boolean runStateAtLeast(int c, int s) { + return c >= s; + } + + private static boolean isRunning(int c) { + return c < SHUTDOWN; + } + + /** + * Attempt to CAS-increment the workerCount field of ctl. + */ + private boolean compareAndIncrementWorkerCount(int expect) { + return ctl.compareAndSet(expect, expect + 1); + } + + /** + * Attempt to CAS-decrement the workerCount field of ctl. + */ + private boolean compareAndDecrementWorkerCount(int expect) { + return ctl.compareAndSet(expect, expect - 1); + } + + /** + * Decrements the workerCount field of ctl. This is called only on + * abrupt termination of a thread (see processWorkerExit). Other + * decrements are performed within getTask. + */ + private void decrementWorkerCount() { + do {} while (! compareAndDecrementWorkerCount(ctl.get())); + } + + /** + * The queue used for holding tasks and handing off to worker + * threads. We do not require that workQueue.poll() returning + * null necessarily means that workQueue.isEmpty(), so rely + * solely on isEmpty to see if the queue is empty (which we must + * do for example when deciding whether to transition from + * SHUTDOWN to TIDYING). This accommodates special-purpose + * queues such as DelayQueues for which poll() is allowed to + * return null even if it may later return non-null when delays + * expire. + */ + private final BlockingQueue
There are no guarantees beyond best-effort attempts to stop
+ * processing actively executing tasks. This implementation
+ * cancels tasks via {@link Thread#interrupt}, so any task that
+ * fails to respond to interrupts may never terminate.
+ *
+ * @throws SecurityException {@inheritDoc}
+ */
+ public List This method may be useful as one part of a cancellation
+ * scheme. It may fail to remove tasks that have been converted
+ * into other forms before being placed on the internal queue. For
+ * example, a task entered using {@code submit} might be
+ * converted into a form that maintains {@code Future} status.
+ * However, in such cases, method {@link #purge} may be used to
+ * remove those Futures that have been cancelled.
+ *
+ * @param task the task to remove
+ * @return true if the task was removed
+ */
+ public boolean remove(Runnable task) {
+ boolean removed = workQueue.remove(task);
+ tryTerminate(); // In case SHUTDOWN and now empty
+ return removed;
+ }
+
+ /**
+ * Tries to remove from the work queue all {@link Future}
+ * tasks that have been cancelled. This method can be useful as a
+ * storage reclamation operation, that has no other impact on
+ * functionality. Cancelled tasks are never executed, but may
+ * accumulate in work queues until worker threads can actively
+ * remove them. Invoking this method instead tries to remove them now.
+ * However, this method may fail to remove tasks in
+ * the presence of interference by other threads.
+ */
+ public void purge() {
+ final BlockingQueue This implementation does nothing, but may be customized in
+ * subclasses. Note: To properly nest multiple overridings, subclasses
+ * should generally invoke {@code super.beforeExecute} at the end of
+ * this method.
+ *
+ * @param t the thread that will run task {@code r}
+ * @param r the task that will be executed
+ */
+ protected void beforeExecute(Thread t, Runnable r) { }
+
+ /**
+ * Method invoked upon completion of execution of the given Runnable.
+ * This method is invoked by the thread that executed the task. If
+ * non-null, the Throwable is the uncaught {@code RuntimeException}
+ * or {@code Error} that caused execution to terminate abruptly.
+ *
+ * This implementation does nothing, but may be customized in
+ * subclasses. Note: To properly nest multiple overridings, subclasses
+ * should generally invoke {@code super.afterExecute} at the
+ * beginning of this method.
+ *
+ * Note: When actions are enclosed in tasks (such as
+ * {@link FutureTask}) either explicitly or via methods such as
+ * {@code submit}, these task objects catch and maintain
+ * computational exceptions, and so they do not cause abrupt
+ * termination, and the internal exceptions are not
+ * passed to this method. If you would like to trap both kinds of
+ * failures in this method, you can further probe for such cases,
+ * as in this sample subclass that prints either the direct cause
+ * or the underlying exception if a task has been aborted:
+ *
+ * {@code
+ * class ExtendedExecutor extends ThreadPoolExecutor {
+ * // ...
+ * protected void afterExecute(Runnable r, Throwable t) {
+ * super.afterExecute(r, t);
+ * if (t == null && r instanceof Future>) {
+ * try {
+ * Object result = ((Future>) r).get();
+ * } catch (CancellationException ce) {
+ * t = ce;
+ * } catch (ExecutionException ee) {
+ * t = ee.getCause();
+ * } catch (InterruptedException ie) {
+ * Thread.currentThread().interrupt(); // ignore/reset
+ * }
+ * }
+ * if (t != null)
+ * System.out.println(t);
+ * }
+ * }}
+ *
+ * @param r the runnable that has completed
+ * @param t the exception that caused termination, or null if
+ * execution completed normally
+ */
+ protected void afterExecute(Runnable r, Throwable t) { }
+
+ /**
+ * Method invoked when the Executor has terminated. Default
+ * implementation does nothing. Note: To properly nest multiple
+ * overridings, subclasses should generally invoke
+ * {@code super.terminated} within this method.
+ */
+ protected void terminated() { }
+
+ /* Predefined RejectedExecutionHandlers */
+
+ /**
+ * A handler for rejected tasks that runs the rejected task
+ * directly in the calling thread of the {@code execute} method,
+ * unless the executor has been shut down, in which case the task
+ * is discarded.
+ */
+ public static class CallerRunsPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code CallerRunsPolicy}.
+ */
+ public CallerRunsPolicy() { }
+
+ /**
+ * Executes task r in the caller's thread, unless the executor
+ * has been shut down, in which case the task is discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ r.run();
+ }
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that throws a
+ * {@code RejectedExecutionException}.
+ */
+ public static class AbortPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates an {@code AbortPolicy}.
+ */
+ public AbortPolicy() { }
+
+ /**
+ * Always throws RejectedExecutionException.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ * @throws RejectedExecutionException always.
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that silently discards the
+ * rejected task.
+ */
+ public static class DiscardPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code DiscardPolicy}.
+ */
+ public DiscardPolicy() { }
+
+ /**
+ * Does nothing, which has the effect of discarding task r.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that discards the oldest unhandled
+ * request and then retries {@code execute}, unless the executor
+ * is shut down, in which case the task is discarded.
+ */
+ public static class DiscardOldestPolicy implements RejectedExecutionHandler {
+ /**
+ * Creates a {@code DiscardOldestPolicy} for the given executor.
+ */
+ public DiscardOldestPolicy() { }
+
+ /**
+ * Obtains and ignores the next task that the executor
+ * would otherwise execute, if one is immediately available,
+ * and then retries execution of task r, unless the executor
+ * is shut down, in which case task r is instead discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ e.getQueue().poll();
+ e.execute(r);
+ }
+ }
+ }
+}