jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java
changeset 2 90ce3da70b43
child 61 5691b03db1ea
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.  Sun designates this
       
     7  * particular file as subject to the "Classpath" exception as provided
       
     8  * by Sun in the LICENSE file that accompanied this code.
       
     9  *
       
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    13  * version 2 for more details (a copy is included in the LICENSE file that
       
    14  * accompanied this code).
       
    15  *
       
    16  * You should have received a copy of the GNU General Public License version
       
    17  * 2 along with this work; if not, write to the Free Software Foundation,
       
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    19  *
       
    20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    21  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    22  * have any questions.
       
    23  */
       
    24 
       
    25 /*
       
    26  * This file is available under and governed by the GNU General Public
       
    27  * License version 2 only, as published by the Free Software Foundation.
       
    28  * However, the following notice accompanied the original version of this
       
    29  * file:
       
    30  *
       
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    32  * Expert Group and released to the public domain, as explained at
       
    33  * http://creativecommons.org/licenses/publicdomain
       
    34  */
       
    35 
       
    36 package java.util.concurrent;
       
    37 import java.util.concurrent.locks.*;
       
    38 import java.util.concurrent.atomic.*;
       
    39 import java.util.*;
       
    40 
       
    41 /**
       
    42  * An {@link ExecutorService} that executes each submitted task using
       
    43  * one of possibly several pooled threads, normally configured
       
    44  * using {@link Executors} factory methods.
       
    45  *
       
    46  * <p>Thread pools address two different problems: they usually
       
    47  * provide improved performance when executing large numbers of
       
    48  * asynchronous tasks, due to reduced per-task invocation overhead,
       
    49  * and they provide a means of bounding and managing the resources,
       
    50  * including threads, consumed when executing a collection of tasks.
       
    51  * Each {@code ThreadPoolExecutor} also maintains some basic
       
    52  * statistics, such as the number of completed tasks.
       
    53  *
       
    54  * <p>To be useful across a wide range of contexts, this class
       
    55  * provides many adjustable parameters and extensibility
       
    56  * hooks. However, programmers are urged to use the more convenient
       
    57  * {@link Executors} factory methods {@link
       
    58  * Executors#newCachedThreadPool} (unbounded thread pool, with
       
    59  * automatic thread reclamation), {@link Executors#newFixedThreadPool}
       
    60  * (fixed size thread pool) and {@link
       
    61  * Executors#newSingleThreadExecutor} (single background thread), that
       
    62  * preconfigure settings for the most common usage
       
    63  * scenarios. Otherwise, use the following guide when manually
       
    64  * configuring and tuning this class:
       
    65  *
       
    66  * <dl>
       
    67  *
       
    68  * <dt>Core and maximum pool sizes</dt>
       
    69  *
       
    70  * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
       
    71  * pool size (see {@link #getPoolSize})
       
    72  * according to the bounds set by
       
    73  * corePoolSize (see {@link #getCorePoolSize}) and
       
    74  * maximumPoolSize (see {@link #getMaximumPoolSize}).
       
    75  *
       
    76  * When a new task is submitted in method {@link #execute}, and fewer
       
    77  * than corePoolSize threads are running, a new thread is created to
       
    78  * handle the request, even if other worker threads are idle.  If
       
    79  * there are more than corePoolSize but less than maximumPoolSize
       
    80  * threads running, a new thread will be created only if the queue is
       
    81  * full.  By setting corePoolSize and maximumPoolSize the same, you
       
    82  * create a fixed-size thread pool. By setting maximumPoolSize to an
       
    83  * essentially unbounded value such as {@code Integer.MAX_VALUE}, you
       
    84  * allow the pool to accommodate an arbitrary number of concurrent
       
    85  * tasks. Most typically, core and maximum pool sizes are set only
       
    86  * upon construction, but they may also be changed dynamically using
       
    87  * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
       
    88  *
       
    89  * <dt>On-demand construction</dt>
       
    90  *
       
    91  * <dd> By default, even core threads are initially created and
       
    92  * started only when new tasks arrive, but this can be overridden
       
    93  * dynamically using method {@link #prestartCoreThread} or {@link
       
    94  * #prestartAllCoreThreads}.  You probably want to prestart threads if
       
    95  * you construct the pool with a non-empty queue. </dd>
       
    96  *
       
    97  * <dt>Creating new threads</dt>
       
    98  *
       
    99  * <dd>New threads are created using a {@link ThreadFactory}.  If not
       
   100  * otherwise specified, a {@link Executors#defaultThreadFactory} is
       
   101  * used, that creates threads to all be in the same {@link
       
   102  * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
       
   103  * non-daemon status. By supplying a different ThreadFactory, you can
       
   104  * alter the thread's name, thread group, priority, daemon status,
       
   105  * etc. If a {@code ThreadFactory} fails to create a thread when asked
       
   106  * by returning null from {@code newThread}, the executor will
       
   107  * continue, but might not be able to execute any tasks. Threads
       
   108  * should possess the "modifyThread" {@code RuntimePermission}. If
       
   109  * worker threads or other threads using the pool do not possess this
       
   110  * permission, service may be degraded: configuration changes may not
       
   111  * take effect in a timely manner, and a shutdown pool may remain in a
       
   112  * state in which termination is possible but not completed.</dd>
       
   113  *
       
   114  * <dt>Keep-alive times</dt>
       
   115  *
       
   116  * <dd>If the pool currently has more than corePoolSize threads,
       
   117  * excess threads will be terminated if they have been idle for more
       
   118  * than the keepAliveTime (see {@link #getKeepAliveTime}). This
       
   119  * provides a means of reducing resource consumption when the pool is
       
   120  * not being actively used. If the pool becomes more active later, new
       
   121  * threads will be constructed. This parameter can also be changed
       
   122  * dynamically using method {@link #setKeepAliveTime}. Using a value
       
   123  * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively
       
   124  * disables idle threads from ever terminating prior to shut down. By
       
   125  * default, the keep-alive policy applies only when there are more
       
   126  * than corePoolSizeThreads. But method {@link
       
   127  * #allowCoreThreadTimeOut(boolean)} can be used to apply this
       
   128  * time-out policy to core threads as well, so long as the
       
   129  * keepAliveTime value is non-zero. </dd>
       
   130  *
       
   131  * <dt>Queuing</dt>
       
   132  *
       
   133  * <dd>Any {@link BlockingQueue} may be used to transfer and hold
       
   134  * submitted tasks.  The use of this queue interacts with pool sizing:
       
   135  *
       
   136  * <ul>
       
   137  *
       
   138  * <li> If fewer than corePoolSize threads are running, the Executor
       
   139  * always prefers adding a new thread
       
   140  * rather than queuing.</li>
       
   141  *
       
   142  * <li> If corePoolSize or more threads are running, the Executor
       
   143  * always prefers queuing a request rather than adding a new
       
   144  * thread.</li>
       
   145  *
       
   146  * <li> If a request cannot be queued, a new thread is created unless
       
   147  * this would exceed maximumPoolSize, in which case, the task will be
       
   148  * rejected.</li>
       
   149  *
       
   150  * </ul>
       
   151  *
       
   152  * There are three general strategies for queuing:
       
   153  * <ol>
       
   154  *
       
   155  * <li> <em> Direct handoffs.</em> A good default choice for a work
       
   156  * queue is a {@link SynchronousQueue} that hands off tasks to threads
       
   157  * without otherwise holding them. Here, an attempt to queue a task
       
   158  * will fail if no threads are immediately available to run it, so a
       
   159  * new thread will be constructed. This policy avoids lockups when
       
   160  * handling sets of requests that might have internal dependencies.
       
   161  * Direct handoffs generally require unbounded maximumPoolSizes to
       
   162  * avoid rejection of new submitted tasks. This in turn admits the
       
   163  * possibility of unbounded thread growth when commands continue to
       
   164  * arrive on average faster than they can be processed.  </li>
       
   165  *
       
   166  * <li><em> Unbounded queues.</em> Using an unbounded queue (for
       
   167  * example a {@link LinkedBlockingQueue} without a predefined
       
   168  * capacity) will cause new tasks to wait in the queue when all
       
   169  * corePoolSize threads are busy. Thus, no more than corePoolSize
       
   170  * threads will ever be created. (And the value of the maximumPoolSize
       
   171  * therefore doesn't have any effect.)  This may be appropriate when
       
   172  * each task is completely independent of others, so tasks cannot
       
   173  * affect each others execution; for example, in a web page server.
       
   174  * While this style of queuing can be useful in smoothing out
       
   175  * transient bursts of requests, it admits the possibility of
       
   176  * unbounded work queue growth when commands continue to arrive on
       
   177  * average faster than they can be processed.  </li>
       
   178  *
       
   179  * <li><em>Bounded queues.</em> A bounded queue (for example, an
       
   180  * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
       
   181  * used with finite maximumPoolSizes, but can be more difficult to
       
   182  * tune and control.  Queue sizes and maximum pool sizes may be traded
       
   183  * off for each other: Using large queues and small pools minimizes
       
   184  * CPU usage, OS resources, and context-switching overhead, but can
       
   185  * lead to artificially low throughput.  If tasks frequently block (for
       
   186  * example if they are I/O bound), a system may be able to schedule
       
   187  * time for more threads than you otherwise allow. Use of small queues
       
   188  * generally requires larger pool sizes, which keeps CPUs busier but
       
   189  * may encounter unacceptable scheduling overhead, which also
       
   190  * decreases throughput.  </li>
       
   191  *
       
   192  * </ol>
       
   193  *
       
   194  * </dd>
       
   195  *
       
   196  * <dt>Rejected tasks</dt>
       
   197  *
       
   198  * <dd> New tasks submitted in method {@link #execute} will be
       
   199  * <em>rejected</em> when the Executor has been shut down, and also
       
   200  * when the Executor uses finite bounds for both maximum threads and
       
   201  * work queue capacity, and is saturated.  In either case, the {@code
       
   202  * execute} method invokes the {@link
       
   203  * RejectedExecutionHandler#rejectedExecution} method of its {@link
       
   204  * RejectedExecutionHandler}.  Four predefined handler policies are
       
   205  * provided:
       
   206  *
       
   207  * <ol>
       
   208  *
       
   209  * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
       
   210  * handler throws a runtime {@link RejectedExecutionException} upon
       
   211  * rejection. </li>
       
   212  *
       
   213  * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
       
   214  * that invokes {@code execute} itself runs the task. This provides a
       
   215  * simple feedback control mechanism that will slow down the rate that
       
   216  * new tasks are submitted. </li>
       
   217  *
       
   218  * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
       
   219  * cannot be executed is simply dropped.  </li>
       
   220  *
       
   221  * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
       
   222  * executor is not shut down, the task at the head of the work queue
       
   223  * is dropped, and then execution is retried (which can fail again,
       
   224  * causing this to be repeated.) </li>
       
   225  *
       
   226  * </ol>
       
   227  *
       
   228  * It is possible to define and use other kinds of {@link
       
   229  * RejectedExecutionHandler} classes. Doing so requires some care
       
   230  * especially when policies are designed to work only under particular
       
   231  * capacity or queuing policies. </dd>
       
   232  *
       
   233  * <dt>Hook methods</dt>
       
   234  *
       
   235  * <dd>This class provides {@code protected} overridable {@link
       
   236  * #beforeExecute} and {@link #afterExecute} methods that are called
       
   237  * before and after execution of each task.  These can be used to
       
   238  * manipulate the execution environment; for example, reinitializing
       
   239  * ThreadLocals, gathering statistics, or adding log
       
   240  * entries. Additionally, method {@link #terminated} can be overridden
       
   241  * to perform any special processing that needs to be done once the
       
   242  * Executor has fully terminated.
       
   243  *
       
   244  * <p>If hook or callback methods throw exceptions, internal worker
       
   245  * threads may in turn fail and abruptly terminate.</dd>
       
   246  *
       
   247  * <dt>Queue maintenance</dt>
       
   248  *
       
   249  * <dd> Method {@link #getQueue} allows access to the work queue for
       
   250  * purposes of monitoring and debugging.  Use of this method for any
       
   251  * other purpose is strongly discouraged.  Two supplied methods,
       
   252  * {@link #remove} and {@link #purge} are available to assist in
       
   253  * storage reclamation when large numbers of queued tasks become
       
   254  * cancelled.</dd>
       
   255  *
       
   256  * <dt>Finalization</dt>
       
   257  *
       
   258  * <dd> A pool that is no longer referenced in a program <em>AND</em>
       
   259  * has no remaining threads will be {@code shutdown} automatically. If
       
   260  * you would like to ensure that unreferenced pools are reclaimed even
       
   261  * if users forget to call {@link #shutdown}, then you must arrange
       
   262  * that unused threads eventually die, by setting appropriate
       
   263  * keep-alive times, using a lower bound of zero core threads and/or
       
   264  * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
       
   265  *
       
   266  * </dl>
       
   267  *
       
   268  * <p> <b>Extension example</b>. Most extensions of this class
       
   269  * override one or more of the protected hook methods. For example,
       
   270  * here is a subclass that adds a simple pause/resume feature:
       
   271  *
       
   272  *  <pre> {@code
       
   273  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
       
   274  *   private boolean isPaused;
       
   275  *   private ReentrantLock pauseLock = new ReentrantLock();
       
   276  *   private Condition unpaused = pauseLock.newCondition();
       
   277  *
       
   278  *   public PausableThreadPoolExecutor(...) { super(...); }
       
   279  *
       
   280  *   protected void beforeExecute(Thread t, Runnable r) {
       
   281  *     super.beforeExecute(t, r);
       
   282  *     pauseLock.lock();
       
   283  *     try {
       
   284  *       while (isPaused) unpaused.await();
       
   285  *     } catch (InterruptedException ie) {
       
   286  *       t.interrupt();
       
   287  *     } finally {
       
   288  *       pauseLock.unlock();
       
   289  *     }
       
   290  *   }
       
   291  *
       
   292  *   public void pause() {
       
   293  *     pauseLock.lock();
       
   294  *     try {
       
   295  *       isPaused = true;
       
   296  *     } finally {
       
   297  *       pauseLock.unlock();
       
   298  *     }
       
   299  *   }
       
   300  *
       
   301  *   public void resume() {
       
   302  *     pauseLock.lock();
       
   303  *     try {
       
   304  *       isPaused = false;
       
   305  *       unpaused.signalAll();
       
   306  *     } finally {
       
   307  *       pauseLock.unlock();
       
   308  *     }
       
   309  *   }
       
   310  * }}</pre>
       
   311  *
       
   312  * @since 1.5
       
   313  * @author Doug Lea
       
   314  */
       
   315 public class ThreadPoolExecutor extends AbstractExecutorService {
       
   316     /**
       
   317      * The main pool control state, ctl, is an atomic integer packing
       
   318      * two conceptual fields
       
   319      *   workerCount, indicating the effective number of threads
       
   320      *   runState,    indicating whether running, shutting down etc
       
   321      *
       
   322      * In order to pack them into one int, we limit workerCount to
       
   323      * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
       
   324      * billion) otherwise representable. If this is ever an issue in
       
   325      * the future, the variable can be changed to be an AtomicLong,
       
   326      * and the shift/mask constants below adjusted. But until the need
       
   327      * arises, this code is a bit faster and simpler using an int.
       
   328      *
       
   329      * The workerCount is the number of workers that have been
       
   330      * permitted to start and not permitted to stop.  The value may be
       
   331      * transiently different from the actual number of live threads,
       
   332      * for example when a ThreadFactory fails to create a thread when
       
   333      * asked, and when exiting threads are still performing
       
   334      * bookkeeping before terminating. The user-visible pool size is
       
   335      * reported as the current size of the workers set.
       
   336      *
       
   337      * The runState provides the main lifecyle control, taking on values:
       
   338      *
       
   339      *   RUNNING:  Accept new tasks and process queued tasks
       
   340      *   SHUTDOWN: Don't accept new tasks, but process queued tasks
       
   341      *   STOP:     Don't accept new tasks, don't process queued tasks,
       
   342      *             and interrupt in-progress tasks
       
   343      *   TIDYING:  All tasks have terminated, workerCount is zero,
       
   344      *             the thread transitioning to state TIDYING
       
   345      *             will run the terminated() hook method
       
   346      *   TERMINATED: terminated() has completed
       
   347      *
       
   348      * The numerical order among these values matters, to allow
       
   349      * ordered comparisons. The runState monotonically increases over
       
   350      * time, but need not hit each state. The transitions are:
       
   351      *
       
   352      * RUNNING -> SHUTDOWN
       
   353      *    On invocation of shutdown(), perhaps implicitly in finalize()
       
   354      * (RUNNING or SHUTDOWN) -> STOP
       
   355      *    On invocation of shutdownNow()
       
   356      * SHUTDOWN -> TIDYING
       
   357      *    When both queue and pool are empty
       
   358      * STOP -> TIDYING
       
   359      *    When pool is empty
       
   360      * TIDYING -> TERMINATED
       
   361      *    When the terminated() hook method has completed
       
   362      *
       
   363      * Threads waiting in awaitTermination() will return when the
       
   364      * state reaches TERMINATED.
       
   365      *
       
   366      * Detecting the transition from SHUTDOWN to TIDYING is less
       
   367      * straightforward than you'd like because the queue may become
       
   368      * empty after non-empty and vice versa during SHUTDOWN state, but
       
   369      * we can only terminate if, after seeing that it is empty, we see
       
   370      * that workerCount is 0 (which sometimes entails a recheck -- see
       
   371      * below).
       
   372      */
       
   373     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
       
   374     private static final int COUNT_BITS = Integer.SIZE - 3;
       
   375     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
       
   376 
       
   377     // runState is stored in the high-order bits
       
   378     private static final int RUNNING    = -1 << COUNT_BITS;
       
   379     private static final int SHUTDOWN   =  0 << COUNT_BITS;
       
   380     private static final int STOP       =  1 << COUNT_BITS;
       
   381     private static final int TIDYING    =  2 << COUNT_BITS;
       
   382     private static final int TERMINATED =  3 << COUNT_BITS;
       
   383 
       
   384     // Packing and unpacking ctl
       
   385     private static int runStateOf(int c)     { return c & ~CAPACITY; }
       
   386     private static int workerCountOf(int c)  { return c & CAPACITY; }
       
   387     private static int ctlOf(int rs, int wc) { return rs | wc; }
       
   388 
       
   389     /*
       
   390      * Bit field accessors that don't require unpacking ctl.
       
   391      * These depend on the bit layout and on workerCount being never negative.
       
   392      */
       
   393 
       
   394     private static boolean runStateLessThan(int c, int s) {
       
   395         return c < s;
       
   396     }
       
   397 
       
   398     private static boolean runStateAtLeast(int c, int s) {
       
   399         return c >= s;
       
   400     }
       
   401 
       
   402     private static boolean isRunning(int c) {
       
   403         return c < SHUTDOWN;
       
   404     }
       
   405 
       
   406     /**
       
   407      * Attempt to CAS-increment the workerCount field of ctl.
       
   408      */
       
   409     private boolean compareAndIncrementWorkerCount(int expect) {
       
   410         return ctl.compareAndSet(expect, expect + 1);
       
   411     }
       
   412 
       
   413     /**
       
   414      * Attempt to CAS-decrement the workerCount field of ctl.
       
   415      */
       
   416     private boolean compareAndDecrementWorkerCount(int expect) {
       
   417         return ctl.compareAndSet(expect, expect - 1);
       
   418     }
       
   419 
       
   420     /**
       
   421      * Decrements the workerCount field of ctl. This is called only on
       
   422      * abrupt termination of a thread (see processWorkerExit). Other
       
   423      * decrements are performed within getTask.
       
   424      */
       
   425     private void decrementWorkerCount() {
       
   426         do {} while (! compareAndDecrementWorkerCount(ctl.get()));
       
   427     }
       
   428 
       
   429     /**
       
   430      * The queue used for holding tasks and handing off to worker
       
   431      * threads.  We do not require that workQueue.poll() returning
       
   432      * null necessarily means that workQueue.isEmpty(), so rely
       
   433      * solely on isEmpty to see if the queue is empty (which we must
       
   434      * do for example when deciding whether to transition from
       
   435      * SHUTDOWN to TIDYING).  This accommodates special-purpose
       
   436      * queues such as DelayQueues for which poll() is allowed to
       
   437      * return null even if it may later return non-null when delays
       
   438      * expire.
       
   439      */
       
   440     private final BlockingQueue<Runnable> workQueue;
       
   441 
       
   442     /**
       
   443      * Lock held on access to workers set and related bookkeeping.
       
   444      * While we could use a concurrent set of some sort, it turns out
       
   445      * to be generally preferable to use a lock. Among the reasons is
       
   446      * that this serializes interruptIdleWorkers, which avoids
       
   447      * unnecessary interrupt storms, especially during shutdown.
       
   448      * Otherwise exiting threads would concurrently interrupt those
       
   449      * that have not yet interrupted. It also simplifies some of the
       
   450      * associated statistics bookkeeping of largestPoolSize etc. We
       
   451      * also hold mainLock on shutdown and shutdownNow, for the sake of
       
   452      * ensuring workers set is stable while separately checking
       
   453      * permission to interrupt and actually interrupting.
       
   454      */
       
   455     private final ReentrantLock mainLock = new ReentrantLock();
       
   456 
       
   457     /**
       
   458      * Set containing all worker threads in pool. Accessed only when
       
   459      * holding mainLock.
       
   460      */
       
   461     private final HashSet<Worker> workers = new HashSet<Worker>();
       
   462 
       
   463     /**
       
   464      * Wait condition to support awaitTermination
       
   465      */
       
   466     private final Condition termination = mainLock.newCondition();
       
   467 
       
   468     /**
       
   469      * Tracks largest attained pool size. Accessed only under
       
   470      * mainLock.
       
   471      */
       
   472     private int largestPoolSize;
       
   473 
       
   474     /**
       
   475      * Counter for completed tasks. Updated only on termination of
       
   476      * worker threads. Accessed only under mainLock.
       
   477      */
       
   478     private long completedTaskCount;
       
   479 
       
   480     /*
       
   481      * All user control parameters are declared as volatiles so that
       
   482      * ongoing actions are based on freshest values, but without need
       
   483      * for locking, since no internal invariants depend on them
       
   484      * changing synchronously with respect to other actions.
       
   485      */
       
   486 
       
   487     /**
       
   488      * Factory for new threads. All threads are created using this
       
   489      * factory (via method addWorker).  All callers must be prepared
       
   490      * for addWorker to fail, which may reflect a system or user's
       
   491      * policy limiting the number of threads.  Even though it is not
       
   492      * treated as an error, failure to create threads may result in
       
   493      * new tasks being rejected or existing ones remaining stuck in
       
   494      * the queue. On the other hand, no special precautions exist to
       
   495      * handle OutOfMemoryErrors that might be thrown while trying to
       
   496      * create threads, since there is generally no recourse from
       
   497      * within this class.
       
   498      */
       
   499     private volatile ThreadFactory threadFactory;
       
   500 
       
   501     /**
       
   502      * Handler called when saturated or shutdown in execute.
       
   503      */
       
   504     private volatile RejectedExecutionHandler handler;
       
   505 
       
   506     /**
       
   507      * Timeout in nanoseconds for idle threads waiting for work.
       
   508      * Threads use this timeout when there are more than corePoolSize
       
   509      * present or if allowCoreThreadTimeOut. Otherwise they wait
       
   510      * forever for new work.
       
   511      */
       
   512     private volatile long keepAliveTime;
       
   513 
       
   514     /**
       
   515      * If false (default), core threads stay alive even when idle.
       
   516      * If true, core threads use keepAliveTime to time out waiting
       
   517      * for work.
       
   518      */
       
   519     private volatile boolean allowCoreThreadTimeOut;
       
   520 
       
   521     /**
       
   522      * Core pool size is the minimum number of workers to keep alive
       
   523      * (and not allow to time out etc) unless allowCoreThreadTimeOut
       
   524      * is set, in which case the minimum is zero.
       
   525      */
       
   526     private volatile int corePoolSize;
       
   527 
       
   528     /**
       
   529      * Maximum pool size. Note that the actual maximum is internally
       
   530      * bounded by CAPACITY.
       
   531      */
       
   532     private volatile int maximumPoolSize;
       
   533 
       
   534     /**
       
   535      * The default rejected execution handler
       
   536      */
       
   537     private static final RejectedExecutionHandler defaultHandler =
       
   538         new AbortPolicy();
       
   539 
       
   540     /**
       
   541      * Permission required for callers of shutdown and shutdownNow.
       
   542      * We additionally require (see checkShutdownAccess) that callers
       
   543      * have permission to actually interrupt threads in the worker set
       
   544      * (as governed by Thread.interrupt, which relies on
       
   545      * ThreadGroup.checkAccess, which in turn relies on
       
   546      * SecurityManager.checkAccess). Shutdowns are attempted only if
       
   547      * these checks pass.
       
   548      *
       
   549      * All actual invocations of Thread.interrupt (see
       
   550      * interruptIdleWorkers and interruptWorkers) ignore
       
   551      * SecurityExceptions, meaning that the attempted interrupts
       
   552      * silently fail. In the case of shutdown, they should not fail
       
   553      * unless the SecurityManager has inconsistent policies, sometimes
       
   554      * allowing access to a thread and sometimes not. In such cases,
       
   555      * failure to actually interrupt threads may disable or delay full
       
   556      * termination. Other uses of interruptIdleWorkers are advisory,
       
   557      * and failure to actually interrupt will merely delay response to
       
   558      * configuration changes so is not handled exceptionally.
       
   559      */
       
   560     private static final RuntimePermission shutdownPerm =
       
   561         new RuntimePermission("modifyThread");
       
   562 
       
   563     /**
       
   564      * Class Worker mainly maintains interrupt control state for
       
   565      * threads running tasks, along with other minor bookkeeping.
       
   566      * This class opportunistically extends AbstractQueuedSynchronizer
       
   567      * to simplify acquiring and releasing a lock surrounding each
       
   568      * task execution.  This protects against interrupts that are
       
   569      * intended to wake up a worker thread waiting for a task from
       
   570      * instead interrupting a task being run.  We implement a simple
       
   571      * non-reentrant mutual exclusion lock rather than use ReentrantLock
       
   572      * because we do not want worker tasks to be able to reacquire the
       
   573      * lock when they invoke pool control methods like setCorePoolSize.
       
   574      */
       
   575     private final class Worker
       
   576         extends AbstractQueuedSynchronizer
       
   577         implements Runnable
       
   578     {
       
   579         /**
       
   580          * This class will never be serialized, but we provide a
       
   581          * serialVersionUID to suppress a javac warning.
       
   582          */
       
   583         private static final long serialVersionUID = 6138294804551838833L;
       
   584 
       
   585         /** Thread this worker is running in.  Null if factory fails. */
       
   586         final Thread thread;
       
   587         /** Initial task to run.  Possibly null. */
       
   588         Runnable firstTask;
       
   589         /** Per-thread task counter */
       
   590         volatile long completedTasks;
       
   591 
       
   592         /**
       
   593          * Creates with given first task and thread from ThreadFactory.
       
   594          * @param firstTask the first task (null if none)
       
   595          */
       
   596         Worker(Runnable firstTask) {
       
   597             this.firstTask = firstTask;
       
   598             this.thread = getThreadFactory().newThread(this);
       
   599         }
       
   600 
       
   601         /** Delegates main run loop to outer runWorker  */
       
   602         public void run() {
       
   603             runWorker(this);
       
   604         }
       
   605 
       
   606         // Lock methods
       
   607         //
       
   608         // The value 0 represents the unlocked state.
       
   609         // The value 1 represents the locked state.
       
   610 
       
   611         protected boolean isHeldExclusively() {
       
   612             return getState() == 1;
       
   613         }
       
   614 
       
   615         protected boolean tryAcquire(int unused) {
       
   616             if (compareAndSetState(0, 1)) {
       
   617                 setExclusiveOwnerThread(Thread.currentThread());
       
   618                 return true;
       
   619             }
       
   620             return false;
       
   621         }
       
   622 
       
   623         protected boolean tryRelease(int unused) {
       
   624             setExclusiveOwnerThread(null);
       
   625             setState(0);
       
   626             return true;
       
   627         }
       
   628 
       
   629         public void lock()        { acquire(1); }
       
   630         public boolean tryLock()  { return tryAcquire(1); }
       
   631         public void unlock()      { release(1); }
       
   632         public boolean isLocked() { return isHeldExclusively(); }
       
   633     }
       
   634 
       
   635     /*
       
   636      * Methods for setting control state
       
   637      */
       
   638 
       
   639     /**
       
   640      * Transitions runState to given target, or leaves it alone if
       
   641      * already at least the given target.
       
   642      *
       
   643      * @param targetState the desired state, either SHUTDOWN or STOP
       
   644      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
       
   645      */
       
   646     private void advanceRunState(int targetState) {
       
   647         for (;;) {
       
   648             int c = ctl.get();
       
   649             if (runStateAtLeast(c, targetState) ||
       
   650                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
       
   651                 break;
       
   652         }
       
   653     }
       
   654 
       
   655     /**
       
   656      * Transitions to TERMINATED state if either (SHUTDOWN and pool
       
   657      * and queue empty) or (STOP and pool empty).  If otherwise
       
   658      * eligible to terminate but workerCount is nonzero, interrupts an
       
   659      * idle worker to ensure that shutdown signals propagate. This
       
   660      * method must be called following any action that might make
       
   661      * termination possible -- reducing worker count or removing tasks
       
   662      * from the queue during shutdown. The method is non-private to
       
   663      * allow access from ScheduledThreadPoolExecutor.
       
   664      */
       
   665     final void tryTerminate() {
       
   666         for (;;) {
       
   667             int c = ctl.get();
       
   668             if (isRunning(c) ||
       
   669                 runStateAtLeast(c, TIDYING) ||
       
   670                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
       
   671                 return;
       
   672             if (workerCountOf(c) != 0) { // Eligible to terminate
       
   673                 interruptIdleWorkers(ONLY_ONE);
       
   674                 return;
       
   675             }
       
   676 
       
   677             final ReentrantLock mainLock = this.mainLock;
       
   678             mainLock.lock();
       
   679             try {
       
   680                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
       
   681                     try {
       
   682                         terminated();
       
   683                     } finally {
       
   684                         ctl.set(ctlOf(TERMINATED, 0));
       
   685                         termination.signalAll();
       
   686                     }
       
   687                     return;
       
   688                 }
       
   689             } finally {
       
   690                 mainLock.unlock();
       
   691             }
       
   692             // else retry on failed CAS
       
   693         }
       
   694     }
       
   695 
       
   696     /*
       
   697      * Methods for controlling interrupts to worker threads.
       
   698      */
       
   699 
       
   700     /**
       
   701      * If there is a security manager, makes sure caller has
       
   702      * permission to shut down threads in general (see shutdownPerm).
       
   703      * If this passes, additionally makes sure the caller is allowed
       
   704      * to interrupt each worker thread. This might not be true even if
       
   705      * first check passed, if the SecurityManager treats some threads
       
   706      * specially.
       
   707      */
       
   708     private void checkShutdownAccess() {
       
   709         SecurityManager security = System.getSecurityManager();
       
   710         if (security != null) {
       
   711             security.checkPermission(shutdownPerm);
       
   712             final ReentrantLock mainLock = this.mainLock;
       
   713             mainLock.lock();
       
   714             try {
       
   715                 for (Worker w : workers)
       
   716                     security.checkAccess(w.thread);
       
   717             } finally {
       
   718                 mainLock.unlock();
       
   719             }
       
   720         }
       
   721     }
       
   722 
       
   723     /**
       
   724      * Interrupts all threads, even if active. Ignores SecurityExceptions
       
   725      * (in which case some threads may remain uninterrupted).
       
   726      */
       
   727     private void interruptWorkers() {
       
   728         final ReentrantLock mainLock = this.mainLock;
       
   729         mainLock.lock();
       
   730         try {
       
   731             for (Worker w : workers) {
       
   732                 try {
       
   733                     w.thread.interrupt();
       
   734                 } catch (SecurityException ignore) {
       
   735                 }
       
   736             }
       
   737         } finally {
       
   738             mainLock.unlock();
       
   739         }
       
   740     }
       
   741 
       
   742     /**
       
   743      * Interrupts threads that might be waiting for tasks (as
       
   744      * indicated by not being locked) so they can check for
       
   745      * termination or configuration changes. Ignores
       
   746      * SecurityExceptions (in which case some threads may remain
       
   747      * uninterrupted).
       
   748      *
       
   749      * @param onlyOne If true, interrupt at most one worker. This is
       
   750      * called only from tryTerminate when termination is otherwise
       
   751      * enabled but there are still other workers.  In this case, at
       
   752      * most one waiting worker is interrupted to propagate shutdown
       
   753      * signals in case all threads are currently waiting.
       
   754      * Interrupting any arbitrary thread ensures that newly arriving
       
   755      * workers since shutdown began will also eventually exit.
       
   756      * To guarantee eventual termination, it suffices to always
       
   757      * interrupt only one idle worker, but shutdown() interrupts all
       
   758      * idle workers so that redundant workers exit promptly, not
       
   759      * waiting for a straggler task to finish.
       
   760      */
       
   761     private void interruptIdleWorkers(boolean onlyOne) {
       
   762         final ReentrantLock mainLock = this.mainLock;
       
   763         mainLock.lock();
       
   764         try {
       
   765             for (Worker w : workers) {
       
   766                 Thread t = w.thread;
       
   767                 if (!t.isInterrupted() && w.tryLock()) {
       
   768                     try {
       
   769                         t.interrupt();
       
   770                     } catch (SecurityException ignore) {
       
   771                     } finally {
       
   772                         w.unlock();
       
   773                     }
       
   774                 }
       
   775                 if (onlyOne)
       
   776                     break;
       
   777             }
       
   778         } finally {
       
   779             mainLock.unlock();
       
   780         }
       
   781     }
       
   782 
       
   783     /**
       
   784      * Common form of interruptIdleWorkers, to avoid having to
       
   785      * remember what the boolean argument means.
       
   786      */
       
   787     private void interruptIdleWorkers() {
       
   788         interruptIdleWorkers(false);
       
   789     }
       
   790 
       
   791     private static final boolean ONLY_ONE = true;
       
   792 
       
   793     /**
       
   794      * Ensures that unless the pool is stopping, the current thread
       
   795      * does not have its interrupt set. This requires a double-check
       
   796      * of state in case the interrupt was cleared concurrently with a
       
   797      * shutdownNow -- if so, the interrupt is re-enabled.
       
   798      */
       
   799     private void clearInterruptsForTaskRun() {
       
   800         if (runStateLessThan(ctl.get(), STOP) &&
       
   801             Thread.interrupted() &&
       
   802             runStateAtLeast(ctl.get(), STOP))
       
   803             Thread.currentThread().interrupt();
       
   804     }
       
   805 
       
   806     /*
       
   807      * Misc utilities, most of which are also exported to
       
   808      * ScheduledThreadPoolExecutor
       
   809      */
       
   810 
       
   811     /**
       
   812      * Invokes the rejected execution handler for the given command.
       
   813      * Package-protected for use by ScheduledThreadPoolExecutor.
       
   814      */
       
   815     final void reject(Runnable command) {
       
   816         handler.rejectedExecution(command, this);
       
   817     }
       
   818 
       
   819     /**
       
   820      * Performs any further cleanup following run state transition on
       
   821      * invocation of shutdown.  A no-op here, but used by
       
   822      * ScheduledThreadPoolExecutor to cancel delayed tasks.
       
   823      */
       
   824     void onShutdown() {
       
   825     }
       
   826 
       
   827     /**
       
   828      * State check needed by ScheduledThreadPoolExecutor to
       
   829      * enable running tasks during shutdown.
       
   830      *
       
   831      * @param shutdownOK true if should return true if SHUTDOWN
       
   832      */
       
   833     final boolean isRunningOrShutdown(boolean shutdownOK) {
       
   834         int rs = runStateOf(ctl.get());
       
   835         return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
       
   836     }
       
   837 
       
   838     /**
       
   839      * Drains the task queue into a new list, normally using
       
   840      * drainTo. But if the queue is a DelayQueue or any other kind of
       
   841      * queue for which poll or drainTo may fail to remove some
       
   842      * elements, it deletes them one by one.
       
   843      */
       
   844     private List<Runnable> drainQueue() {
       
   845         BlockingQueue<Runnable> q = workQueue;
       
   846         List<Runnable> taskList = new ArrayList<Runnable>();
       
   847         q.drainTo(taskList);
       
   848         if (!q.isEmpty()) {
       
   849             for (Runnable r : q.toArray(new Runnable[0])) {
       
   850                 if (q.remove(r))
       
   851                     taskList.add(r);
       
   852             }
       
   853         }
       
   854         return taskList;
       
   855     }
       
   856 
       
   857     /*
       
   858      * Methods for creating, running and cleaning up after workers
       
   859      */
       
   860 
       
   861     /**
       
   862      * Checks if a new worker can be added with respect to current
       
   863      * pool state and the given bound (either core or maximum). If so,
       
   864      * the worker count is adjusted accordingly, and, if possible, a
       
   865      * new worker is created and started running firstTask as its
       
   866      * first task. This method returns false if the pool is stopped or
       
   867      * eligible to shut down. It also returns false if the thread
       
   868      * factory fails to create a thread when asked, which requires a
       
   869      * backout of workerCount, and a recheck for termination, in case
       
   870      * the existence of this worker was holding up termination.
       
   871      *
       
   872      * @param firstTask the task the new thread should run first (or
       
   873      * null if none). Workers are created with an initial first task
       
   874      * (in method execute()) to bypass queuing when there are fewer
       
   875      * than corePoolSize threads (in which case we always start one),
       
   876      * or when the queue is full (in which case we must bypass queue).
       
   877      * Initially idle threads are usually created via
       
   878      * prestartCoreThread or to replace other dying workers.
       
   879      *
       
   880      * @param core if true use corePoolSize as bound, else
       
   881      * maximumPoolSize. (A boolean indicator is used here rather than a
       
   882      * value to ensure reads of fresh values after checking other pool
       
   883      * state).
       
   884      * @return true if successful
       
   885      */
       
   886     private boolean addWorker(Runnable firstTask, boolean core) {
       
   887         retry:
       
   888         for (;;) {
       
   889             int c = ctl.get();
       
   890             int rs = runStateOf(c);
       
   891 
       
   892             // Check if queue empty only if necessary.
       
   893             if (rs >= SHUTDOWN &&
       
   894                 ! (rs == SHUTDOWN &&
       
   895                    firstTask == null &&
       
   896                    ! workQueue.isEmpty()))
       
   897                 return false;
       
   898 
       
   899             for (;;) {
       
   900                 int wc = workerCountOf(c);
       
   901                 if (wc >= CAPACITY ||
       
   902                     wc >= (core ? corePoolSize : maximumPoolSize))
       
   903                     return false;
       
   904                 if (compareAndIncrementWorkerCount(c))
       
   905                     break retry;
       
   906                 c = ctl.get();  // Re-read ctl
       
   907                 if (runStateOf(c) != rs)
       
   908                     continue retry;
       
   909                 // else CAS failed due to workerCount change; retry inner loop
       
   910             }
       
   911         }
       
   912 
       
   913         Worker w = new Worker(firstTask);
       
   914         Thread t = w.thread;
       
   915 
       
   916         final ReentrantLock mainLock = this.mainLock;
       
   917         mainLock.lock();
       
   918         try {
       
   919             // Recheck while holding lock.
       
   920             // Back out on ThreadFactory failure or if
       
   921             // shut down before lock acquired.
       
   922             int c = ctl.get();
       
   923             int rs = runStateOf(c);
       
   924 
       
   925             if (t == null ||
       
   926                 (rs >= SHUTDOWN &&
       
   927                  ! (rs == SHUTDOWN &&
       
   928                     firstTask == null))) {
       
   929                 decrementWorkerCount();
       
   930                 tryTerminate();
       
   931                 return false;
       
   932             }
       
   933 
       
   934             workers.add(w);
       
   935 
       
   936             int s = workers.size();
       
   937             if (s > largestPoolSize)
       
   938                 largestPoolSize = s;
       
   939         } finally {
       
   940             mainLock.unlock();
       
   941         }
       
   942 
       
   943         t.start();
       
   944         // It is possible (but unlikely) for a thread to have been
       
   945         // added to workers, but not yet started, during transition to
       
   946         // STOP, which could result in a rare missed interrupt,
       
   947         // because Thread.interrupt is not guaranteed to have any effect
       
   948         // on a non-yet-started Thread (see Thread#interrupt).
       
   949         if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
       
   950             t.interrupt();
       
   951 
       
   952         return true;
       
   953     }
       
   954 
       
   955     /**
       
   956      * Performs cleanup and bookkeeping for a dying worker. Called
       
   957      * only from worker threads. Unless completedAbruptly is set,
       
   958      * assumes that workerCount has already been adjusted to account
       
   959      * for exit.  This method removes thread from worker set, and
       
   960      * possibly terminates the pool or replaces the worker if either
       
   961      * it exited due to user task exception or if fewer than
       
   962      * corePoolSize workers are running or queue is non-empty but
       
   963      * there are no workers.
       
   964      *
       
   965      * @param w the worker
       
   966      * @param completedAbruptly if the worker died due to user exception
       
   967      */
       
   968     private void processWorkerExit(Worker w, boolean completedAbruptly) {
       
   969         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       
   970             decrementWorkerCount();
       
   971 
       
   972         final ReentrantLock mainLock = this.mainLock;
       
   973         mainLock.lock();
       
   974         try {
       
   975             completedTaskCount += w.completedTasks;
       
   976             workers.remove(w);
       
   977         } finally {
       
   978             mainLock.unlock();
       
   979         }
       
   980 
       
   981         tryTerminate();
       
   982 
       
   983         int c = ctl.get();
       
   984         if (runStateLessThan(c, STOP)) {
       
   985             if (!completedAbruptly) {
       
   986                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
       
   987                 if (min == 0 && ! workQueue.isEmpty())
       
   988                     min = 1;
       
   989                 if (workerCountOf(c) >= min)
       
   990                     return; // replacement not needed
       
   991             }
       
   992             addWorker(null, false);
       
   993         }
       
   994     }
       
   995 
       
   996     /**
       
   997      * Performs blocking or timed wait for a task, depending on
       
   998      * current configuration settings, or returns null if this worker
       
   999      * must exit because of any of:
       
  1000      * 1. There are more than maximumPoolSize workers (due to
       
  1001      *    a call to setMaximumPoolSize).
       
  1002      * 2. The pool is stopped.
       
  1003      * 3. The pool is shutdown and the queue is empty.
       
  1004      * 4. This worker timed out waiting for a task, and timed-out
       
  1005      *    workers are subject to termination (that is,
       
  1006      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
       
  1007      *    both before and after the timed wait.
       
  1008      *
       
  1009      * @return task, or null if the worker must exit, in which case
       
  1010      *         workerCount is decremented
       
  1011      */
       
  1012     private Runnable getTask() {
       
  1013         boolean timedOut = false; // Did the last poll() time out?
       
  1014 
       
  1015         retry:
       
  1016         for (;;) {
       
  1017             int c = ctl.get();
       
  1018             int rs = runStateOf(c);
       
  1019 
       
  1020             // Check if queue empty only if necessary.
       
  1021             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
       
  1022                 decrementWorkerCount();
       
  1023                 return null;
       
  1024             }
       
  1025 
       
  1026             boolean timed;      // Are workers subject to culling?
       
  1027 
       
  1028             for (;;) {
       
  1029                 int wc = workerCountOf(c);
       
  1030                 timed = allowCoreThreadTimeOut || wc > corePoolSize;
       
  1031 
       
  1032                 if (wc <= maximumPoolSize && ! (timedOut && timed))
       
  1033                     break;
       
  1034                 if (compareAndDecrementWorkerCount(c))
       
  1035                     return null;
       
  1036                 c = ctl.get();  // Re-read ctl
       
  1037                 if (runStateOf(c) != rs)
       
  1038                     continue retry;
       
  1039                 // else CAS failed due to workerCount change; retry inner loop
       
  1040             }
       
  1041 
       
  1042             try {
       
  1043                 Runnable r = timed ?
       
  1044                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
       
  1045                     workQueue.take();
       
  1046                 if (r != null)
       
  1047                     return r;
       
  1048                 timedOut = true;
       
  1049             } catch (InterruptedException retry) {
       
  1050                 timedOut = false;
       
  1051             }
       
  1052         }
       
  1053     }
       
  1054 
       
  1055     /**
       
  1056      * Main worker run loop.  Repeatedly gets tasks from queue and
       
  1057      * executes them, while coping with a number of issues:
       
  1058      *
       
  1059      * 1. We may start out with an initial task, in which case we
       
  1060      * don't need to get the first one. Otherwise, as long as pool is
       
  1061      * running, we get tasks from getTask. If it returns null then the
       
  1062      * worker exits due to changed pool state or configuration
       
  1063      * parameters.  Other exits result from exception throws in
       
  1064      * external code, in which case completedAbruptly holds, which
       
  1065      * usually leads processWorkerExit to replace this thread.
       
  1066      *
       
  1067      * 2. Before running any task, the lock is acquired to prevent
       
  1068      * other pool interrupts while the task is executing, and
       
  1069      * clearInterruptsForTaskRun called to ensure that unless pool is
       
  1070      * stopping, this thread does not have its interrupt set.
       
  1071      *
       
  1072      * 3. Each task run is preceded by a call to beforeExecute, which
       
  1073      * might throw an exception, in which case we cause thread to die
       
  1074      * (breaking loop with completedAbruptly true) without processing
       
  1075      * the task.
       
  1076      *
       
  1077      * 4. Assuming beforeExecute completes normally, we run the task,
       
  1078      * gathering any of its thrown exceptions to send to
       
  1079      * afterExecute. We separately handle RuntimeException, Error
       
  1080      * (both of which the specs guarantee that we trap) and arbitrary
       
  1081      * Throwables.  Because we cannot rethrow Throwables within
       
  1082      * Runnable.run, we wrap them within Errors on the way out (to the
       
  1083      * thread's UncaughtExceptionHandler).  Any thrown exception also
       
  1084      * conservatively causes thread to die.
       
  1085      *
       
  1086      * 5. After task.run completes, we call afterExecute, which may
       
  1087      * also throw an exception, which will also cause thread to
       
  1088      * die. According to JLS Sec 14.20, this exception is the one that
       
  1089      * will be in effect even if task.run throws.
       
  1090      *
       
  1091      * The net effect of the exception mechanics is that afterExecute
       
  1092      * and the thread's UncaughtExceptionHandler have as accurate
       
  1093      * information as we can provide about any problems encountered by
       
  1094      * user code.
       
  1095      *
       
  1096      * @param w the worker
       
  1097      */
       
  1098     final void runWorker(Worker w) {
       
  1099         Runnable task = w.firstTask;
       
  1100         w.firstTask = null;
       
  1101         boolean completedAbruptly = true;
       
  1102         try {
       
  1103             while (task != null || (task = getTask()) != null) {
       
  1104                 w.lock();
       
  1105                 clearInterruptsForTaskRun();
       
  1106                 try {
       
  1107                     beforeExecute(w.thread, task);
       
  1108                     Throwable thrown = null;
       
  1109                     try {
       
  1110                         task.run();
       
  1111                     } catch (RuntimeException x) {
       
  1112                         thrown = x; throw x;
       
  1113                     } catch (Error x) {
       
  1114                         thrown = x; throw x;
       
  1115                     } catch (Throwable x) {
       
  1116                         thrown = x; throw new Error(x);
       
  1117                     } finally {
       
  1118                         afterExecute(task, thrown);
       
  1119                     }
       
  1120                 } finally {
       
  1121                     task = null;
       
  1122                     w.completedTasks++;
       
  1123                     w.unlock();
       
  1124                 }
       
  1125             }
       
  1126             completedAbruptly = false;
       
  1127         } finally {
       
  1128             processWorkerExit(w, completedAbruptly);
       
  1129         }
       
  1130     }
       
  1131 
       
  1132     // Public constructors and methods
       
  1133 
       
  1134     /**
       
  1135      * Creates a new {@code ThreadPoolExecutor} with the given initial
       
  1136      * parameters and default thread factory and rejected execution handler.
       
  1137      * It may be more convenient to use one of the {@link Executors} factory
       
  1138      * methods instead of this general purpose constructor.
       
  1139      *
       
  1140      * @param corePoolSize the number of threads to keep in the pool, even
       
  1141      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
       
  1142      * @param maximumPoolSize the maximum number of threads to allow in the
       
  1143      *        pool
       
  1144      * @param keepAliveTime when the number of threads is greater than
       
  1145      *        the core, this is the maximum time that excess idle threads
       
  1146      *        will wait for new tasks before terminating.
       
  1147      * @param unit the time unit for the {@code keepAliveTime} argument
       
  1148      * @param workQueue the queue to use for holding tasks before they are
       
  1149      *        executed.  This queue will hold only the {@code Runnable}
       
  1150      *        tasks submitted by the {@code execute} method.
       
  1151      * @throws IllegalArgumentException if one of the following holds:<br>
       
  1152      *         {@code corePoolSize < 0}<br>
       
  1153      *         {@code keepAliveTime < 0}<br>
       
  1154      *         {@code maximumPoolSize <= 0}<br>
       
  1155      *         {@code maximumPoolSize < corePoolSize}
       
  1156      * @throws NullPointerException if {@code workQueue} is null
       
  1157      */
       
  1158     public ThreadPoolExecutor(int corePoolSize,
       
  1159                               int maximumPoolSize,
       
  1160                               long keepAliveTime,
       
  1161                               TimeUnit unit,
       
  1162                               BlockingQueue<Runnable> workQueue) {
       
  1163         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
       
  1164              Executors.defaultThreadFactory(), defaultHandler);
       
  1165     }
       
  1166 
       
  1167     /**
       
  1168      * Creates a new {@code ThreadPoolExecutor} with the given initial
       
  1169      * parameters and default rejected execution handler.
       
  1170      *
       
  1171      * @param corePoolSize the number of threads to keep in the pool, even
       
  1172      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
       
  1173      * @param maximumPoolSize the maximum number of threads to allow in the
       
  1174      *        pool
       
  1175      * @param keepAliveTime when the number of threads is greater than
       
  1176      *        the core, this is the maximum time that excess idle threads
       
  1177      *        will wait for new tasks before terminating.
       
  1178      * @param unit the time unit for the {@code keepAliveTime} argument
       
  1179      * @param workQueue the queue to use for holding tasks before they are
       
  1180      *        executed.  This queue will hold only the {@code Runnable}
       
  1181      *        tasks submitted by the {@code execute} method.
       
  1182      * @param threadFactory the factory to use when the executor
       
  1183      *        creates a new thread
       
  1184      * @throws IllegalArgumentException if one of the following holds:<br>
       
  1185      *         {@code corePoolSize < 0}<br>
       
  1186      *         {@code keepAliveTime < 0}<br>
       
  1187      *         {@code maximumPoolSize <= 0}<br>
       
  1188      *         {@code maximumPoolSize < corePoolSize}
       
  1189      * @throws NullPointerException if {@code workQueue}
       
  1190      *         or {@code threadFactory} is null
       
  1191      */
       
  1192     public ThreadPoolExecutor(int corePoolSize,
       
  1193                               int maximumPoolSize,
       
  1194                               long keepAliveTime,
       
  1195                               TimeUnit unit,
       
  1196                               BlockingQueue<Runnable> workQueue,
       
  1197                               ThreadFactory threadFactory) {
       
  1198         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
       
  1199              threadFactory, defaultHandler);
       
  1200     }
       
  1201 
       
  1202     /**
       
  1203      * Creates a new {@code ThreadPoolExecutor} with the given initial
       
  1204      * parameters and default thread factory.
       
  1205      *
       
  1206      * @param corePoolSize the number of threads to keep in the pool, even
       
  1207      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
       
  1208      * @param maximumPoolSize the maximum number of threads to allow in the
       
  1209      *        pool
       
  1210      * @param keepAliveTime when the number of threads is greater than
       
  1211      *        the core, this is the maximum time that excess idle threads
       
  1212      *        will wait for new tasks before terminating.
       
  1213      * @param unit the time unit for the {@code keepAliveTime} argument
       
  1214      * @param workQueue the queue to use for holding tasks before they are
       
  1215      *        executed.  This queue will hold only the {@code Runnable}
       
  1216      *        tasks submitted by the {@code execute} method.
       
  1217      * @param handler the handler to use when execution is blocked
       
  1218      *        because the thread bounds and queue capacities are reached
       
  1219      * @throws IllegalArgumentException if one of the following holds:<br>
       
  1220      *         {@code corePoolSize < 0}<br>
       
  1221      *         {@code keepAliveTime < 0}<br>
       
  1222      *         {@code maximumPoolSize <= 0}<br>
       
  1223      *         {@code maximumPoolSize < corePoolSize}
       
  1224      * @throws NullPointerException if {@code workQueue}
       
  1225      *         or {@code handler} is null
       
  1226      */
       
  1227     public ThreadPoolExecutor(int corePoolSize,
       
  1228                               int maximumPoolSize,
       
  1229                               long keepAliveTime,
       
  1230                               TimeUnit unit,
       
  1231                               BlockingQueue<Runnable> workQueue,
       
  1232                               RejectedExecutionHandler handler) {
       
  1233         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
       
  1234              Executors.defaultThreadFactory(), handler);
       
  1235     }
       
  1236 
       
  1237     /**
       
  1238      * Creates a new {@code ThreadPoolExecutor} with the given initial
       
  1239      * parameters.
       
  1240      *
       
  1241      * @param corePoolSize the number of threads to keep in the pool, even
       
  1242      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
       
  1243      * @param maximumPoolSize the maximum number of threads to allow in the
       
  1244      *        pool
       
  1245      * @param keepAliveTime when the number of threads is greater than
       
  1246      *        the core, this is the maximum time that excess idle threads
       
  1247      *        will wait for new tasks before terminating.
       
  1248      * @param unit the time unit for the {@code keepAliveTime} argument
       
  1249      * @param workQueue the queue to use for holding tasks before they are
       
  1250      *        executed.  This queue will hold only the {@code Runnable}
       
  1251      *        tasks submitted by the {@code execute} method.
       
  1252      * @param threadFactory the factory to use when the executor
       
  1253      *        creates a new thread
       
  1254      * @param handler the handler to use when execution is blocked
       
  1255      *        because the thread bounds and queue capacities are reached
       
  1256      * @throws IllegalArgumentException if one of the following holds:<br>
       
  1257      *         {@code corePoolSize < 0}<br>
       
  1258      *         {@code keepAliveTime < 0}<br>
       
  1259      *         {@code maximumPoolSize <= 0}<br>
       
  1260      *         {@code maximumPoolSize < corePoolSize}
       
  1261      * @throws NullPointerException if {@code workQueue}
       
  1262      *         or {@code threadFactory} or {@code handler} is null
       
  1263      */
       
  1264     public ThreadPoolExecutor(int corePoolSize,
       
  1265                               int maximumPoolSize,
       
  1266                               long keepAliveTime,
       
  1267                               TimeUnit unit,
       
  1268                               BlockingQueue<Runnable> workQueue,
       
  1269                               ThreadFactory threadFactory,
       
  1270                               RejectedExecutionHandler handler) {
       
  1271         if (corePoolSize < 0 ||
       
  1272             maximumPoolSize <= 0 ||
       
  1273             maximumPoolSize < corePoolSize ||
       
  1274             keepAliveTime < 0)
       
  1275             throw new IllegalArgumentException();
       
  1276         if (workQueue == null || threadFactory == null || handler == null)
       
  1277             throw new NullPointerException();
       
  1278         this.corePoolSize = corePoolSize;
       
  1279         this.maximumPoolSize = maximumPoolSize;
       
  1280         this.workQueue = workQueue;
       
  1281         this.keepAliveTime = unit.toNanos(keepAliveTime);
       
  1282         this.threadFactory = threadFactory;
       
  1283         this.handler = handler;
       
  1284     }
       
  1285 
       
  1286     /**
       
  1287      * Executes the given task sometime in the future.  The task
       
  1288      * may execute in a new thread or in an existing pooled thread.
       
  1289      *
       
  1290      * If the task cannot be submitted for execution, either because this
       
  1291      * executor has been shutdown or because its capacity has been reached,
       
  1292      * the task is handled by the current {@code RejectedExecutionHandler}.
       
  1293      *
       
  1294      * @param command the task to execute
       
  1295      * @throws RejectedExecutionException at discretion of
       
  1296      *         {@code RejectedExecutionHandler}, if the task
       
  1297      *         cannot be accepted for execution
       
  1298      * @throws NullPointerException if {@code command} is null
       
  1299      */
       
  1300     public void execute(Runnable command) {
       
  1301         if (command == null)
       
  1302             throw new NullPointerException();
       
  1303         /*
       
  1304          * Proceed in 3 steps:
       
  1305          *
       
  1306          * 1. If fewer than corePoolSize threads are running, try to
       
  1307          * start a new thread with the given command as its first
       
  1308          * task.  The call to addWorker atomically checks runState and
       
  1309          * workerCount, and so prevents false alarms that would add
       
  1310          * threads when it shouldn't, by returning false.
       
  1311          *
       
  1312          * 2. If a task can be successfully queued, then we still need
       
  1313          * to double-check whether we should have added a thread
       
  1314          * (because existing ones died since last checking) or that
       
  1315          * the pool shut down since entry into this method. So we
       
  1316          * recheck state and if necessary roll back the enqueuing if
       
  1317          * stopped, or start a new thread if there are none.
       
  1318          *
       
  1319          * 3. If we cannot queue task, then we try to add a new
       
  1320          * thread.  If it fails, we know we are shut down or saturated
       
  1321          * and so reject the task.
       
  1322          */
       
  1323         int c = ctl.get();
       
  1324         if (workerCountOf(c) < corePoolSize) {
       
  1325             if (addWorker(command, true))
       
  1326                 return;
       
  1327             c = ctl.get();
       
  1328         }
       
  1329         if (isRunning(c) && workQueue.offer(command)) {
       
  1330             int recheck = ctl.get();
       
  1331             if (! isRunning(recheck) && remove(command))
       
  1332                 reject(command);
       
  1333             else if (workerCountOf(recheck) == 0)
       
  1334                 addWorker(null, false);
       
  1335         }
       
  1336         else if (!addWorker(command, false))
       
  1337             reject(command);
       
  1338     }
       
  1339 
       
  1340     /**
       
  1341      * Initiates an orderly shutdown in which previously submitted
       
  1342      * tasks are executed, but no new tasks will be accepted.
       
  1343      * Invocation has no additional effect if already shut down.
       
  1344      *
       
  1345      * @throws SecurityException {@inheritDoc}
       
  1346      */
       
  1347     public void shutdown() {
       
  1348         final ReentrantLock mainLock = this.mainLock;
       
  1349         mainLock.lock();
       
  1350         try {
       
  1351             checkShutdownAccess();
       
  1352             advanceRunState(SHUTDOWN);
       
  1353             interruptIdleWorkers();
       
  1354             onShutdown(); // hook for ScheduledThreadPoolExecutor
       
  1355         } finally {
       
  1356             mainLock.unlock();
       
  1357         }
       
  1358         tryTerminate();
       
  1359     }
       
  1360 
       
  1361     /**
       
  1362      * Attempts to stop all actively executing tasks, halts the
       
  1363      * processing of waiting tasks, and returns a list of the tasks
       
  1364      * that were awaiting execution. These tasks are drained (removed)
       
  1365      * from the task queue upon return from this method.
       
  1366      *
       
  1367      * <p>There are no guarantees beyond best-effort attempts to stop
       
  1368      * processing actively executing tasks.  This implementation
       
  1369      * cancels tasks via {@link Thread#interrupt}, so any task that
       
  1370      * fails to respond to interrupts may never terminate.
       
  1371      *
       
  1372      * @throws SecurityException {@inheritDoc}
       
  1373      */
       
  1374     public List<Runnable> shutdownNow() {
       
  1375         List<Runnable> tasks;
       
  1376         final ReentrantLock mainLock = this.mainLock;
       
  1377         mainLock.lock();
       
  1378         try {
       
  1379             checkShutdownAccess();
       
  1380             advanceRunState(STOP);
       
  1381             interruptWorkers();
       
  1382             tasks = drainQueue();
       
  1383         } finally {
       
  1384             mainLock.unlock();
       
  1385         }
       
  1386         tryTerminate();
       
  1387         return tasks;
       
  1388     }
       
  1389 
       
  1390     public boolean isShutdown() {
       
  1391         return ! isRunning(ctl.get());
       
  1392     }
       
  1393 
       
  1394     /**
       
  1395      * Returns true if this executor is in the process of terminating
       
  1396      * after {@link #shutdown} or {@link #shutdownNow} but has not
       
  1397      * completely terminated.  This method may be useful for
       
  1398      * debugging. A return of {@code true} reported a sufficient
       
  1399      * period after shutdown may indicate that submitted tasks have
       
  1400      * ignored or suppressed interruption, causing this executor not
       
  1401      * to properly terminate.
       
  1402      *
       
  1403      * @return true if terminating but not yet terminated
       
  1404      */
       
  1405     public boolean isTerminating() {
       
  1406         int c = ctl.get();
       
  1407         return ! isRunning(c) && runStateLessThan(c, TERMINATED);
       
  1408     }
       
  1409 
       
  1410     public boolean isTerminated() {
       
  1411         return runStateAtLeast(ctl.get(), TERMINATED);
       
  1412     }
       
  1413 
       
  1414     public boolean awaitTermination(long timeout, TimeUnit unit)
       
  1415         throws InterruptedException {
       
  1416         long nanos = unit.toNanos(timeout);
       
  1417         final ReentrantLock mainLock = this.mainLock;
       
  1418         mainLock.lock();
       
  1419         try {
       
  1420             for (;;) {
       
  1421                 if (runStateAtLeast(ctl.get(), TERMINATED))
       
  1422                     return true;
       
  1423                 if (nanos <= 0)
       
  1424                     return false;
       
  1425                 nanos = termination.awaitNanos(nanos);
       
  1426             }
       
  1427         } finally {
       
  1428             mainLock.unlock();
       
  1429         }
       
  1430     }
       
  1431 
       
  1432     /**
       
  1433      * Invokes {@code shutdown} when this executor is no longer
       
  1434      * referenced and it has no threads.
       
  1435      */
       
  1436     protected void finalize() {
       
  1437         shutdown();
       
  1438     }
       
  1439 
       
  1440     /**
       
  1441      * Sets the thread factory used to create new threads.
       
  1442      *
       
  1443      * @param threadFactory the new thread factory
       
  1444      * @throws NullPointerException if threadFactory is null
       
  1445      * @see #getThreadFactory
       
  1446      */
       
  1447     public void setThreadFactory(ThreadFactory threadFactory) {
       
  1448         if (threadFactory == null)
       
  1449             throw new NullPointerException();
       
  1450         this.threadFactory = threadFactory;
       
  1451     }
       
  1452 
       
  1453     /**
       
  1454      * Returns the thread factory used to create new threads.
       
  1455      *
       
  1456      * @return the current thread factory
       
  1457      * @see #setThreadFactory
       
  1458      */
       
  1459     public ThreadFactory getThreadFactory() {
       
  1460         return threadFactory;
       
  1461     }
       
  1462 
       
  1463     /**
       
  1464      * Sets a new handler for unexecutable tasks.
       
  1465      *
       
  1466      * @param handler the new handler
       
  1467      * @throws NullPointerException if handler is null
       
  1468      * @see #getRejectedExecutionHandler
       
  1469      */
       
  1470     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
       
  1471         if (handler == null)
       
  1472             throw new NullPointerException();
       
  1473         this.handler = handler;
       
  1474     }
       
  1475 
       
  1476     /**
       
  1477      * Returns the current handler for unexecutable tasks.
       
  1478      *
       
  1479      * @return the current handler
       
  1480      * @see #setRejectedExecutionHandler
       
  1481      */
       
  1482     public RejectedExecutionHandler getRejectedExecutionHandler() {
       
  1483         return handler;
       
  1484     }
       
  1485 
       
  1486     /**
       
  1487      * Sets the core number of threads.  This overrides any value set
       
  1488      * in the constructor.  If the new value is smaller than the
       
  1489      * current value, excess existing threads will be terminated when
       
  1490      * they next become idle.  If larger, new threads will, if needed,
       
  1491      * be started to execute any queued tasks.
       
  1492      *
       
  1493      * @param corePoolSize the new core size
       
  1494      * @throws IllegalArgumentException if {@code corePoolSize < 0}
       
  1495      * @see #getCorePoolSize
       
  1496      */
       
  1497     public void setCorePoolSize(int corePoolSize) {
       
  1498         if (corePoolSize < 0)
       
  1499             throw new IllegalArgumentException();
       
  1500         int delta = corePoolSize - this.corePoolSize;
       
  1501         this.corePoolSize = corePoolSize;
       
  1502         if (workerCountOf(ctl.get()) > corePoolSize)
       
  1503             interruptIdleWorkers();
       
  1504         else if (delta > 0) {
       
  1505             // We don't really know how many new threads are "needed".
       
  1506             // As a heuristic, prestart enough new workers (up to new
       
  1507             // core size) to handle the current number of tasks in
       
  1508             // queue, but stop if queue becomes empty while doing so.
       
  1509             int k = Math.min(delta, workQueue.size());
       
  1510             while (k-- > 0 && addWorker(null, true)) {
       
  1511                 if (workQueue.isEmpty())
       
  1512                     break;
       
  1513             }
       
  1514         }
       
  1515     }
       
  1516 
       
  1517     /**
       
  1518      * Returns the core number of threads.
       
  1519      *
       
  1520      * @return the core number of threads
       
  1521      * @see #setCorePoolSize
       
  1522      */
       
  1523     public int getCorePoolSize() {
       
  1524         return corePoolSize;
       
  1525     }
       
  1526 
       
  1527     /**
       
  1528      * Starts a core thread, causing it to idly wait for work. This
       
  1529      * overrides the default policy of starting core threads only when
       
  1530      * new tasks are executed. This method will return {@code false}
       
  1531      * if all core threads have already been started.
       
  1532      *
       
  1533      * @return {@code true} if a thread was started
       
  1534      */
       
  1535     public boolean prestartCoreThread() {
       
  1536         return workerCountOf(ctl.get()) < corePoolSize &&
       
  1537             addWorker(null, true);
       
  1538     }
       
  1539 
       
  1540     /**
       
  1541      * Starts all core threads, causing them to idly wait for work. This
       
  1542      * overrides the default policy of starting core threads only when
       
  1543      * new tasks are executed.
       
  1544      *
       
  1545      * @return the number of threads started
       
  1546      */
       
  1547     public int prestartAllCoreThreads() {
       
  1548         int n = 0;
       
  1549         while (addWorker(null, true))
       
  1550             ++n;
       
  1551         return n;
       
  1552     }
       
  1553 
       
  1554     /**
       
  1555      * Returns true if this pool allows core threads to time out and
       
  1556      * terminate if no tasks arrive within the keepAlive time, being
       
  1557      * replaced if needed when new tasks arrive. When true, the same
       
  1558      * keep-alive policy applying to non-core threads applies also to
       
  1559      * core threads. When false (the default), core threads are never
       
  1560      * terminated due to lack of incoming tasks.
       
  1561      *
       
  1562      * @return {@code true} if core threads are allowed to time out,
       
  1563      *         else {@code false}
       
  1564      *
       
  1565      * @since 1.6
       
  1566      */
       
  1567     public boolean allowsCoreThreadTimeOut() {
       
  1568         return allowCoreThreadTimeOut;
       
  1569     }
       
  1570 
       
  1571     /**
       
  1572      * Sets the policy governing whether core threads may time out and
       
  1573      * terminate if no tasks arrive within the keep-alive time, being
       
  1574      * replaced if needed when new tasks arrive. When false, core
       
  1575      * threads are never terminated due to lack of incoming
       
  1576      * tasks. When true, the same keep-alive policy applying to
       
  1577      * non-core threads applies also to core threads. To avoid
       
  1578      * continual thread replacement, the keep-alive time must be
       
  1579      * greater than zero when setting {@code true}. This method
       
  1580      * should in general be called before the pool is actively used.
       
  1581      *
       
  1582      * @param value {@code true} if should time out, else {@code false}
       
  1583      * @throws IllegalArgumentException if value is {@code true}
       
  1584      *         and the current keep-alive time is not greater than zero
       
  1585      *
       
  1586      * @since 1.6
       
  1587      */
       
  1588     public void allowCoreThreadTimeOut(boolean value) {
       
  1589         if (value && keepAliveTime <= 0)
       
  1590             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
       
  1591         if (value != allowCoreThreadTimeOut) {
       
  1592             allowCoreThreadTimeOut = value;
       
  1593             if (value)
       
  1594                 interruptIdleWorkers();
       
  1595         }
       
  1596     }
       
  1597 
       
  1598     /**
       
  1599      * Sets the maximum allowed number of threads. This overrides any
       
  1600      * value set in the constructor. If the new value is smaller than
       
  1601      * the current value, excess existing threads will be
       
  1602      * terminated when they next become idle.
       
  1603      *
       
  1604      * @param maximumPoolSize the new maximum
       
  1605      * @throws IllegalArgumentException if the new maximum is
       
  1606      *         less than or equal to zero, or
       
  1607      *         less than the {@linkplain #getCorePoolSize core pool size}
       
  1608      * @see #getMaximumPoolSize
       
  1609      */
       
  1610     public void setMaximumPoolSize(int maximumPoolSize) {
       
  1611         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
       
  1612             throw new IllegalArgumentException();
       
  1613         this.maximumPoolSize = maximumPoolSize;
       
  1614         if (workerCountOf(ctl.get()) > maximumPoolSize)
       
  1615             interruptIdleWorkers();
       
  1616     }
       
  1617 
       
  1618     /**
       
  1619      * Returns the maximum allowed number of threads.
       
  1620      *
       
  1621      * @return the maximum allowed number of threads
       
  1622      * @see #setMaximumPoolSize
       
  1623      */
       
  1624     public int getMaximumPoolSize() {
       
  1625         return maximumPoolSize;
       
  1626     }
       
  1627 
       
  1628     /**
       
  1629      * Sets the time limit for which threads may remain idle before
       
  1630      * being terminated.  If there are more than the core number of
       
  1631      * threads currently in the pool, after waiting this amount of
       
  1632      * time without processing a task, excess threads will be
       
  1633      * terminated.  This overrides any value set in the constructor.
       
  1634      *
       
  1635      * @param time the time to wait.  A time value of zero will cause
       
  1636      *        excess threads to terminate immediately after executing tasks.
       
  1637      * @param unit the time unit of the {@code time} argument
       
  1638      * @throws IllegalArgumentException if {@code time} less than zero or
       
  1639      *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
       
  1640      * @see #getKeepAliveTime
       
  1641      */
       
  1642     public void setKeepAliveTime(long time, TimeUnit unit) {
       
  1643         if (time < 0)
       
  1644             throw new IllegalArgumentException();
       
  1645         if (time == 0 && allowsCoreThreadTimeOut())
       
  1646             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
       
  1647         long keepAliveTime = unit.toNanos(time);
       
  1648         long delta = keepAliveTime - this.keepAliveTime;
       
  1649         this.keepAliveTime = keepAliveTime;
       
  1650         if (delta < 0)
       
  1651             interruptIdleWorkers();
       
  1652     }
       
  1653 
       
  1654     /**
       
  1655      * Returns the thread keep-alive time, which is the amount of time
       
  1656      * that threads in excess of the core pool size may remain
       
  1657      * idle before being terminated.
       
  1658      *
       
  1659      * @param unit the desired time unit of the result
       
  1660      * @return the time limit
       
  1661      * @see #setKeepAliveTime
       
  1662      */
       
  1663     public long getKeepAliveTime(TimeUnit unit) {
       
  1664         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
       
  1665     }
       
  1666 
       
  1667     /* User-level queue utilities */
       
  1668 
       
  1669     /**
       
  1670      * Returns the task queue used by this executor. Access to the
       
  1671      * task queue is intended primarily for debugging and monitoring.
       
  1672      * This queue may be in active use.  Retrieving the task queue
       
  1673      * does not prevent queued tasks from executing.
       
  1674      *
       
  1675      * @return the task queue
       
  1676      */
       
  1677     public BlockingQueue<Runnable> getQueue() {
       
  1678         return workQueue;
       
  1679     }
       
  1680 
       
  1681     /**
       
  1682      * Removes this task from the executor's internal queue if it is
       
  1683      * present, thus causing it not to be run if it has not already
       
  1684      * started.
       
  1685      *
       
  1686      * <p> This method may be useful as one part of a cancellation
       
  1687      * scheme.  It may fail to remove tasks that have been converted
       
  1688      * into other forms before being placed on the internal queue. For
       
  1689      * example, a task entered using {@code submit} might be
       
  1690      * converted into a form that maintains {@code Future} status.
       
  1691      * However, in such cases, method {@link #purge} may be used to
       
  1692      * remove those Futures that have been cancelled.
       
  1693      *
       
  1694      * @param task the task to remove
       
  1695      * @return true if the task was removed
       
  1696      */
       
  1697     public boolean remove(Runnable task) {
       
  1698         boolean removed = workQueue.remove(task);
       
  1699         tryTerminate(); // In case SHUTDOWN and now empty
       
  1700         return removed;
       
  1701     }
       
  1702 
       
  1703     /**
       
  1704      * Tries to remove from the work queue all {@link Future}
       
  1705      * tasks that have been cancelled. This method can be useful as a
       
  1706      * storage reclamation operation, that has no other impact on
       
  1707      * functionality. Cancelled tasks are never executed, but may
       
  1708      * accumulate in work queues until worker threads can actively
       
  1709      * remove them. Invoking this method instead tries to remove them now.
       
  1710      * However, this method may fail to remove tasks in
       
  1711      * the presence of interference by other threads.
       
  1712      */
       
  1713     public void purge() {
       
  1714         final BlockingQueue<Runnable> q = workQueue;
       
  1715         try {
       
  1716             Iterator<Runnable> it = q.iterator();
       
  1717             while (it.hasNext()) {
       
  1718                 Runnable r = it.next();
       
  1719                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
       
  1720                     it.remove();
       
  1721             }
       
  1722         } catch (ConcurrentModificationException fallThrough) {
       
  1723             // Take slow path if we encounter interference during traversal.
       
  1724             // Make copy for traversal and call remove for cancelled entries.
       
  1725             // The slow path is more likely to be O(N*N).
       
  1726             for (Object r : q.toArray())
       
  1727                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
       
  1728                     q.remove(r);
       
  1729         }
       
  1730 
       
  1731         tryTerminate(); // In case SHUTDOWN and now empty
       
  1732     }
       
  1733 
       
  1734     /* Statistics */
       
  1735 
       
  1736     /**
       
  1737      * Returns the current number of threads in the pool.
       
  1738      *
       
  1739      * @return the number of threads
       
  1740      */
       
  1741     public int getPoolSize() {
       
  1742         final ReentrantLock mainLock = this.mainLock;
       
  1743         mainLock.lock();
       
  1744         try {
       
  1745             // Remove rare and surprising possibility of
       
  1746             // isTerminated() && getPoolSize() > 0
       
  1747             return runStateAtLeast(ctl.get(), TIDYING) ? 0
       
  1748                 : workers.size();
       
  1749         } finally {
       
  1750             mainLock.unlock();
       
  1751         }
       
  1752     }
       
  1753 
       
  1754     /**
       
  1755      * Returns the approximate number of threads that are actively
       
  1756      * executing tasks.
       
  1757      *
       
  1758      * @return the number of threads
       
  1759      */
       
  1760     public int getActiveCount() {
       
  1761         final ReentrantLock mainLock = this.mainLock;
       
  1762         mainLock.lock();
       
  1763         try {
       
  1764             int n = 0;
       
  1765             for (Worker w : workers)
       
  1766                 if (w.isLocked())
       
  1767                     ++n;
       
  1768             return n;
       
  1769         } finally {
       
  1770             mainLock.unlock();
       
  1771         }
       
  1772     }
       
  1773 
       
  1774     /**
       
  1775      * Returns the largest number of threads that have ever
       
  1776      * simultaneously been in the pool.
       
  1777      *
       
  1778      * @return the number of threads
       
  1779      */
       
  1780     public int getLargestPoolSize() {
       
  1781         final ReentrantLock mainLock = this.mainLock;
       
  1782         mainLock.lock();
       
  1783         try {
       
  1784             return largestPoolSize;
       
  1785         } finally {
       
  1786             mainLock.unlock();
       
  1787         }
       
  1788     }
       
  1789 
       
  1790     /**
       
  1791      * Returns the approximate total number of tasks that have ever been
       
  1792      * scheduled for execution. Because the states of tasks and
       
  1793      * threads may change dynamically during computation, the returned
       
  1794      * value is only an approximation.
       
  1795      *
       
  1796      * @return the number of tasks
       
  1797      */
       
  1798     public long getTaskCount() {
       
  1799         final ReentrantLock mainLock = this.mainLock;
       
  1800         mainLock.lock();
       
  1801         try {
       
  1802             long n = completedTaskCount;
       
  1803             for (Worker w : workers) {
       
  1804                 n += w.completedTasks;
       
  1805                 if (w.isLocked())
       
  1806                     ++n;
       
  1807             }
       
  1808             return n + workQueue.size();
       
  1809         } finally {
       
  1810             mainLock.unlock();
       
  1811         }
       
  1812     }
       
  1813 
       
  1814     /**
       
  1815      * Returns the approximate total number of tasks that have
       
  1816      * completed execution. Because the states of tasks and threads
       
  1817      * may change dynamically during computation, the returned value
       
  1818      * is only an approximation, but one that does not ever decrease
       
  1819      * across successive calls.
       
  1820      *
       
  1821      * @return the number of tasks
       
  1822      */
       
  1823     public long getCompletedTaskCount() {
       
  1824         final ReentrantLock mainLock = this.mainLock;
       
  1825         mainLock.lock();
       
  1826         try {
       
  1827             long n = completedTaskCount;
       
  1828             for (Worker w : workers)
       
  1829                 n += w.completedTasks;
       
  1830             return n;
       
  1831         } finally {
       
  1832             mainLock.unlock();
       
  1833         }
       
  1834     }
       
  1835 
       
  1836     /* Extension hooks */
       
  1837 
       
  1838     /**
       
  1839      * Method invoked prior to executing the given Runnable in the
       
  1840      * given thread.  This method is invoked by thread {@code t} that
       
  1841      * will execute task {@code r}, and may be used to re-initialize
       
  1842      * ThreadLocals, or to perform logging.
       
  1843      *
       
  1844      * <p>This implementation does nothing, but may be customized in
       
  1845      * subclasses. Note: To properly nest multiple overridings, subclasses
       
  1846      * should generally invoke {@code super.beforeExecute} at the end of
       
  1847      * this method.
       
  1848      *
       
  1849      * @param t the thread that will run task {@code r}
       
  1850      * @param r the task that will be executed
       
  1851      */
       
  1852     protected void beforeExecute(Thread t, Runnable r) { }
       
  1853 
       
  1854     /**
       
  1855      * Method invoked upon completion of execution of the given Runnable.
       
  1856      * This method is invoked by the thread that executed the task. If
       
  1857      * non-null, the Throwable is the uncaught {@code RuntimeException}
       
  1858      * or {@code Error} that caused execution to terminate abruptly.
       
  1859      *
       
  1860      * <p>This implementation does nothing, but may be customized in
       
  1861      * subclasses. Note: To properly nest multiple overridings, subclasses
       
  1862      * should generally invoke {@code super.afterExecute} at the
       
  1863      * beginning of this method.
       
  1864      *
       
  1865      * <p><b>Note:</b> When actions are enclosed in tasks (such as
       
  1866      * {@link FutureTask}) either explicitly or via methods such as
       
  1867      * {@code submit}, these task objects catch and maintain
       
  1868      * computational exceptions, and so they do not cause abrupt
       
  1869      * termination, and the internal exceptions are <em>not</em>
       
  1870      * passed to this method. If you would like to trap both kinds of
       
  1871      * failures in this method, you can further probe for such cases,
       
  1872      * as in this sample subclass that prints either the direct cause
       
  1873      * or the underlying exception if a task has been aborted:
       
  1874      *
       
  1875      *  <pre> {@code
       
  1876      * class ExtendedExecutor extends ThreadPoolExecutor {
       
  1877      *   // ...
       
  1878      *   protected void afterExecute(Runnable r, Throwable t) {
       
  1879      *     super.afterExecute(r, t);
       
  1880      *     if (t == null && r instanceof Future<?>) {
       
  1881      *       try {
       
  1882      *         Object result = ((Future<?>) r).get();
       
  1883      *       } catch (CancellationException ce) {
       
  1884      *           t = ce;
       
  1885      *       } catch (ExecutionException ee) {
       
  1886      *           t = ee.getCause();
       
  1887      *       } catch (InterruptedException ie) {
       
  1888      *           Thread.currentThread().interrupt(); // ignore/reset
       
  1889      *       }
       
  1890      *     }
       
  1891      *     if (t != null)
       
  1892      *       System.out.println(t);
       
  1893      *   }
       
  1894      * }}</pre>
       
  1895      *
       
  1896      * @param r the runnable that has completed
       
  1897      * @param t the exception that caused termination, or null if
       
  1898      * execution completed normally
       
  1899      */
       
  1900     protected void afterExecute(Runnable r, Throwable t) { }
       
  1901 
       
  1902     /**
       
  1903      * Method invoked when the Executor has terminated.  Default
       
  1904      * implementation does nothing. Note: To properly nest multiple
       
  1905      * overridings, subclasses should generally invoke
       
  1906      * {@code super.terminated} within this method.
       
  1907      */
       
  1908     protected void terminated() { }
       
  1909 
       
  1910     /* Predefined RejectedExecutionHandlers */
       
  1911 
       
  1912     /**
       
  1913      * A handler for rejected tasks that runs the rejected task
       
  1914      * directly in the calling thread of the {@code execute} method,
       
  1915      * unless the executor has been shut down, in which case the task
       
  1916      * is discarded.
       
  1917      */
       
  1918     public static class CallerRunsPolicy implements RejectedExecutionHandler {
       
  1919         /**
       
  1920          * Creates a {@code CallerRunsPolicy}.
       
  1921          */
       
  1922         public CallerRunsPolicy() { }
       
  1923 
       
  1924         /**
       
  1925          * Executes task r in the caller's thread, unless the executor
       
  1926          * has been shut down, in which case the task is discarded.
       
  1927          *
       
  1928          * @param r the runnable task requested to be executed
       
  1929          * @param e the executor attempting to execute this task
       
  1930          */
       
  1931         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       
  1932             if (!e.isShutdown()) {
       
  1933                 r.run();
       
  1934             }
       
  1935         }
       
  1936     }
       
  1937 
       
  1938     /**
       
  1939      * A handler for rejected tasks that throws a
       
  1940      * {@code RejectedExecutionException}.
       
  1941      */
       
  1942     public static class AbortPolicy implements RejectedExecutionHandler {
       
  1943         /**
       
  1944          * Creates an {@code AbortPolicy}.
       
  1945          */
       
  1946         public AbortPolicy() { }
       
  1947 
       
  1948         /**
       
  1949          * Always throws RejectedExecutionException.
       
  1950          *
       
  1951          * @param r the runnable task requested to be executed
       
  1952          * @param e the executor attempting to execute this task
       
  1953          * @throws RejectedExecutionException always.
       
  1954          */
       
  1955         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       
  1956             throw new RejectedExecutionException();
       
  1957         }
       
  1958     }
       
  1959 
       
  1960     /**
       
  1961      * A handler for rejected tasks that silently discards the
       
  1962      * rejected task.
       
  1963      */
       
  1964     public static class DiscardPolicy implements RejectedExecutionHandler {
       
  1965         /**
       
  1966          * Creates a {@code DiscardPolicy}.
       
  1967          */
       
  1968         public DiscardPolicy() { }
       
  1969 
       
  1970         /**
       
  1971          * Does nothing, which has the effect of discarding task r.
       
  1972          *
       
  1973          * @param r the runnable task requested to be executed
       
  1974          * @param e the executor attempting to execute this task
       
  1975          */
       
  1976         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       
  1977         }
       
  1978     }
       
  1979 
       
  1980     /**
       
  1981      * A handler for rejected tasks that discards the oldest unhandled
       
  1982      * request and then retries {@code execute}, unless the executor
       
  1983      * is shut down, in which case the task is discarded.
       
  1984      */
       
  1985     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       
  1986         /**
       
  1987          * Creates a {@code DiscardOldestPolicy} for the given executor.
       
  1988          */
       
  1989         public DiscardOldestPolicy() { }
       
  1990 
       
  1991         /**
       
  1992          * Obtains and ignores the next task that the executor
       
  1993          * would otherwise execute, if one is immediately available,
       
  1994          * and then retries execution of task r, unless the executor
       
  1995          * is shut down, in which case task r is instead discarded.
       
  1996          *
       
  1997          * @param r the runnable task requested to be executed
       
  1998          * @param e the executor attempting to execute this task
       
  1999          */
       
  2000         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       
  2001             if (!e.isShutdown()) {
       
  2002                 e.getQueue().poll();
       
  2003                 e.execute(r);
       
  2004             }
       
  2005         }
       
  2006     }
       
  2007 }