jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java
changeset 16079 7252530399e2
parent 10609 0caee7f44e7b
child 18790 d25399d849bc
--- a/jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java	Tue Oct 16 10:56:25 2012 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java	Wed Aug 22 21:40:19 2012 -0400
@@ -34,8 +34,10 @@
  */
 
 package java.util.concurrent;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.*;
 
 /**
@@ -491,10 +493,15 @@
      * policy limiting the number of threads.  Even though it is not
      * treated as an error, failure to create threads may result in
      * new tasks being rejected or existing ones remaining stuck in
-     * the queue. On the other hand, no special precautions exist to
-     * handle OutOfMemoryErrors that might be thrown while trying to
-     * create threads, since there is generally no recourse from
-     * within this class.
+     * the queue.
+     *
+     * We go further and preserve pool invariants even in the face of
+     * errors such as OutOfMemoryError, that might be thrown while
+     * trying to create threads.  Such errors are rather common due to
+     * the need to allocate a native stack in Thread#start, and users
+     * will want to perform clean pool shutdown to clean up.  There
+     * will likely be enough memory available for the cleanup code to
+     * complete without encountering yet another OutOfMemoryError.
      */
     private volatile ThreadFactory threadFactory;
 
@@ -568,9 +575,13 @@
      * task execution.  This protects against interrupts that are
      * intended to wake up a worker thread waiting for a task from
      * instead interrupting a task being run.  We implement a simple
-     * non-reentrant mutual exclusion lock rather than use ReentrantLock
-     * because we do not want worker tasks to be able to reacquire the
-     * lock when they invoke pool control methods like setCorePoolSize.
+     * non-reentrant mutual exclusion lock rather than use
+     * ReentrantLock because we do not want worker tasks to be able to
+     * reacquire the lock when they invoke pool control methods like
+     * setCorePoolSize.  Additionally, to suppress interrupts until
+     * the thread actually starts running tasks, we initialize lock
+     * state to a negative value, and clear it upon start (in
+     * runWorker).
      */
     private final class Worker
         extends AbstractQueuedSynchronizer
@@ -594,6 +605,7 @@
          * @param firstTask the first task (null if none)
          */
         Worker(Runnable firstTask) {
+            setState(-1); // inhibit interrupts until runWorker
             this.firstTask = firstTask;
             this.thread = getThreadFactory().newThread(this);
         }
@@ -609,7 +621,7 @@
         // The value 1 represents the locked state.
 
         protected boolean isHeldExclusively() {
-            return getState() == 1;
+            return getState() != 0;
         }
 
         protected boolean tryAcquire(int unused) {
@@ -630,6 +642,16 @@
         public boolean tryLock()  { return tryAcquire(1); }
         public void unlock()      { release(1); }
         public boolean isLocked() { return isHeldExclusively(); }
+
+        void interruptIfStarted() {
+            Thread t;
+            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
+                try {
+                    t.interrupt();
+                } catch (SecurityException ignore) {
+                }
+            }
+        }
     }
 
     /*
@@ -728,12 +750,8 @@
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
-            for (Worker w : workers) {
-                try {
-                    w.thread.interrupt();
-                } catch (SecurityException ignore) {
-                }
-            }
+            for (Worker w : workers)
+                w.interruptIfStarted();
         } finally {
             mainLock.unlock();
         }
@@ -790,19 +808,6 @@
 
     private static final boolean ONLY_ONE = true;
 
-    /**
-     * Ensures that unless the pool is stopping, the current thread
-     * does not have its interrupt set. This requires a double-check
-     * of state in case the interrupt was cleared concurrently with a
-     * shutdownNow -- if so, the interrupt is re-enabled.
-     */
-    private void clearInterruptsForTaskRun() {
-        if (runStateLessThan(ctl.get(), STOP) &&
-            Thread.interrupted() &&
-            runStateAtLeast(ctl.get(), STOP))
-            Thread.currentThread().interrupt();
-    }
-
     /*
      * Misc utilities, most of which are also exported to
      * ScheduledThreadPoolExecutor
@@ -862,12 +867,13 @@
      * Checks if a new worker can be added with respect to current
      * pool state and the given bound (either core or maximum). If so,
      * the worker count is adjusted accordingly, and, if possible, a
-     * new worker is created and started running firstTask as its
+     * new worker is created and started, running firstTask as its
      * first task. This method returns false if the pool is stopped or
      * eligible to shut down. It also returns false if the thread
-     * factory fails to create a thread when asked, which requires a
-     * backout of workerCount, and a recheck for termination, in case
-     * the existence of this worker was holding up termination.
+     * factory fails to create a thread when asked.  If the thread
+     * creation fails, either due to the thread factory returning
+     * null, or due to an exception (typically OutOfMemoryError in
+     * Thread#start), we roll back cleanly.
      *
      * @param firstTask the task the new thread should run first (or
      * null if none). Workers are created with an initial first task
@@ -910,46 +916,65 @@
             }
         }
 
-        Worker w = new Worker(firstTask);
-        Thread t = w.thread;
+        boolean workerStarted = false;
+        boolean workerAdded = false;
+        Worker w = null;
+        try {
+            final ReentrantLock mainLock = this.mainLock;
+            w = new Worker(firstTask);
+            final Thread t = w.thread;
+            if (t != null) {
+                mainLock.lock();
+                try {
+                    // Recheck while holding lock.
+                    // Back out on ThreadFactory failure or if
+                    // shut down before lock acquired.
+                    int c = ctl.get();
+                    int rs = runStateOf(c);
 
+                    if (rs < SHUTDOWN ||
+                        (rs == SHUTDOWN && firstTask == null)) {
+                        if (t.isAlive()) // precheck that t is startable
+                            throw new IllegalThreadStateException();
+                        workers.add(w);
+                        int s = workers.size();
+                        if (s > largestPoolSize)
+                            largestPoolSize = s;
+                        workerAdded = true;
+                    }
+                } finally {
+                    mainLock.unlock();
+                }
+                if (workerAdded) {
+                    t.start();
+                    workerStarted = true;
+                }
+            }
+        } finally {
+            if (! workerStarted)
+                addWorkerFailed(w);
+        }
+        return workerStarted;
+    }
+
+    /**
+     * Rolls back the worker thread creation.
+     * - removes worker from workers, if present
+     * - decrements worker count
+     * - rechecks for termination, in case the existence of this
+     *   worker was holding up termination
+     */
+    private void addWorkerFailed(Worker w) {
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
-            // Recheck while holding lock.
-            // Back out on ThreadFactory failure or if
-            // shut down before lock acquired.
-            int c = ctl.get();
-            int rs = runStateOf(c);
-
-            if (t == null ||
-                (rs >= SHUTDOWN &&
-                 ! (rs == SHUTDOWN &&
-                    firstTask == null))) {
-                decrementWorkerCount();
-                tryTerminate();
-                return false;
-            }
-
-            workers.add(w);
-
-            int s = workers.size();
-            if (s > largestPoolSize)
-                largestPoolSize = s;
+            if (w != null)
+                workers.remove(w);
+            decrementWorkerCount();
+            tryTerminate();
         } finally {
             mainLock.unlock();
         }
-
-        t.start();
-        // It is possible (but unlikely) for a thread to have been
-        // added to workers, but not yet started, during transition to
-        // STOP, which could result in a rare missed interrupt,
-        // because Thread.interrupt is not guaranteed to have any effect
-        // on a non-yet-started Thread (see Thread#interrupt).
-        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
-            t.interrupt();
-
-        return true;
     }
 
     /**
@@ -1096,15 +1121,25 @@
      * @param w the worker
      */
     final void runWorker(Worker w) {
+        Thread wt = Thread.currentThread();
         Runnable task = w.firstTask;
         w.firstTask = null;
+        w.unlock(); // allow interrupts
         boolean completedAbruptly = true;
         try {
             while (task != null || (task = getTask()) != null) {
                 w.lock();
-                clearInterruptsForTaskRun();
+                // If pool is stopping, ensure thread is interrupted;
+                // if not, ensure thread is not interrupted.  This
+                // requires a recheck in second case to deal with
+                // shutdownNow race while clearing interrupt
+                if ((runStateAtLeast(ctl.get(), STOP) ||
+                     (Thread.interrupted() &&
+                      runStateAtLeast(ctl.get(), STOP))) &&
+                    !wt.isInterrupted())
+                    wt.interrupt();
                 try {
-                    beforeExecute(w.thread, task);
+                    beforeExecute(wt, task);
                     Throwable thrown = null;
                     try {
                         task.run();
@@ -2064,3 +2099,4 @@
         }
     }
 }
+