corba/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java
changeset 13171 1ac5e9a54a6e
parent 5555 b2b5ed3f0d0d
--- a/corba/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java	Wed Jul 05 18:14:56 2017 +0200
+++ b/corba/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java	Wed Jun 27 21:09:29 2012 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2003, 2004, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,18 @@
 
 package com.sun.corba.se.impl.orbutil.threadpool;
 
+import java.io.IOException;
+import java.io.Closeable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
 import com.sun.corba.se.spi.orbutil.threadpool.Work;
@@ -36,12 +48,27 @@
 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
 import com.sun.corba.se.spi.monitoring.MonitoredObject;
 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
+import com.sun.corba.se.spi.orb.ORB;
 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
 
+import com.sun.corba.se.impl.logging.ORBUtilSystemException;
+import com.sun.corba.se.impl.orbutil.ORBConstants;
+import com.sun.corba.se.spi.logging.CORBALogDomains;
+
 public class ThreadPoolImpl implements ThreadPool
 {
-    private static int threadCounter = 0; // serial counter useful for debugging
+    // serial counter useful for debugging
+    private static AtomicInteger threadCounter = new AtomicInteger(0);
+    private static final ORBUtilSystemException wrapper =
+        ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
+
 
+    // Any time currentThreadCount and/or availableWorkerThreads is updated
+    // or accessed this ThreadPool's WorkQueue must be locked. And, it is
+    // expected that this ThreadPool's WorkQueue is the only object that
+    // updates and accesses these values directly and indirectly though a
+    // call to a method in this ThreadPool. If any call to update or access
+    // those values must synchronized on this ThreadPool's WorkQueue.
     private WorkQueue workQueue;
 
     // Stores the number of available worker threads
@@ -65,14 +92,11 @@
     // Running count of the work items processed
     // Set the value to 1 so that divide by zero is avoided in
     // averageWorkCompletionTime()
-    private long processedCount = 1;
+    private AtomicLong processedCount = new AtomicLong(1);
 
     // Running aggregate of the time taken in millis to execute work items
     // processed by the threads in the threadpool
-    private long totalTimeTaken = 0;
-
-    // Lock for protecting state when required
-    private Object lock = new Object();
+    private AtomicLong totalTimeTaken = new AtomicLong(0);
 
     // Name of the ThreadPool
     private String name;
@@ -81,7 +105,10 @@
     private MonitoredObject threadpoolMonitoredObject;
 
     // ThreadGroup in which threads should be created
-    private ThreadGroup threadGroup ;
+    private ThreadGroup threadGroup;
+
+    Object workersLock = new Object();
+    List<WorkerThread> workers = new ArrayList<>();
 
     /**
      * This constructor is used to create an unbounded threadpool
@@ -90,7 +117,7 @@
         inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
         maxWorkerThreads = Integer.MAX_VALUE;
         workQueue = new WorkQueueImpl(this);
-        threadGroup = tg ;
+        threadGroup = tg;
         name = threadpoolName;
         initializeMonitoring();
     }
@@ -121,6 +148,30 @@
         initializeMonitoring();
     }
 
+    // Note that this method should not return until AFTER all threads have died.
+    public void close() throws IOException {
+
+        // Copy to avoid concurrent modification problems.
+        List<WorkerThread> copy = null;
+        synchronized (workersLock) {
+            copy = new ArrayList<>(workers);
+        }
+
+        for (WorkerThread wt : copy) {
+            wt.close();
+            while (wt.getState() != Thread.State.TERMINATED) {
+                try {
+                    wt.join();
+                } catch (InterruptedException exc) {
+                    wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
+                }
+            }
+        }
+
+        threadGroup = null;
+    }
+
+
     // Setup monitoring for this threadpool
     private void initializeMonitoring() {
         // Get root monitored object
@@ -217,8 +268,8 @@
      * or notify waiting threads on the queue for available work
      */
     void notifyForAvailableWork(WorkQueue aWorkQueue) {
-        synchronized (lock) {
-            if (availableWorkerThreads == 0) {
+        synchronized (aWorkQueue) {
+            if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
                 createWorkerThread();
             } else {
                 aWorkQueue.notify();
@@ -227,120 +278,145 @@
     }
 
 
+    private Thread createWorkerThreadHelper( String name ) {
+        // Thread creation needs to be in a doPrivileged block
+        // if there is a non-null security manager for two reasons:
+        // 1. The creation of a thread in a specific ThreadGroup
+        //    is a privileged operation.  Lack of a doPrivileged
+        //    block here causes an AccessControlException
+        //    (see bug 6268145).
+        // 2. We want to make sure that the permissions associated
+        //    with this thread do NOT include the permissions of
+        //    the current thread that is calling this method.
+        //    This leads to problems in the app server where
+        //    some threads in the ThreadPool randomly get
+        //    bad permissions, leading to unpredictable
+        //    permission errors (see bug 6021011).
+        //
+        //    A Java thread contains a stack of call frames,
+        //    one for each method called that has not yet returned.
+        //    Each method comes from a particular class.  The class
+        //    was loaded by a ClassLoader which has an associated
+        //    CodeSource, and this determines the Permissions
+        //    for all methods in that class.  The current
+        //    Permissions for the thread are the intersection of
+        //    all Permissions for the methods on the stack.
+        //    This is part of the Security Context of the thread.
+        //
+        //    When a thread creates a new thread, the new thread
+        //    inherits the security context of the old thread.
+        //    This is bad in a ThreadPool, because different
+        //    creators of threads may have different security contexts.
+        //    This leads to occasional unpredictable errors when
+        //    a thread is re-used in a different security context.
+        //
+        //    Avoiding this problem is simple: just do the thread
+        //    creation in a doPrivileged block.  This sets the
+        //    inherited security context to that of the code source
+        //    for the ORB code itself, which contains all permissions
+        //    in either Java SE or Java EE.
+        WorkerThread thread = new WorkerThread(threadGroup, name);
+        synchronized (workersLock) {
+            workers.add(thread);
+        }
+
+        // The thread must be set to a daemon thread so the
+        // VM can exit if the only threads left are PooledThreads
+        // or other daemons.  We don't want to rely on the
+        // calling thread always being a daemon.
+        // Note that no exception is possible here since we
+        // are inside the doPrivileged block.
+        thread.setDaemon(true);
+
+        wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
+
+        thread.start();
+        return null;
+    }
+
+
     /**
      * To be called from the workqueue to create worker threads when none
      * available.
      */
     void createWorkerThread() {
-        WorkerThread thread;
-
-        synchronized (lock) {
-            if (boundedThreadPool) {
-                if (currentThreadCount < maxWorkerThreads) {
-                    thread = new WorkerThread(threadGroup, getName());
-                    currentThreadCount++;
+        final String name = getName();
+        synchronized (workQueue) {
+            try {
+                if (System.getSecurityManager() == null) {
+                    createWorkerThreadHelper(name);
                 } else {
-                    // REVIST - Need to create a thread to monitor the
-                    // the state for deadlock i.e. all threads waiting for
-                    // something which can be got from the item in the
-                    // workqueue, but there is no thread available to
-                    // process that work item - DEADLOCK !!
-                    return;
+                    // If we get here, we need to create a thread.
+                    AccessController.doPrivileged(
+                            new PrivilegedAction() {
+                        public Object run() {
+                            return createWorkerThreadHelper(name);
+                        }
+                    }
+                    );
                 }
-            } else {
-                thread = new WorkerThread(threadGroup, getName());
-                currentThreadCount++;
+            } catch (Throwable t) {
+                // Decrementing the count of current worker threads.
+                // But, it will be increased in the finally block.
+                decrementCurrentNumberOfThreads();
+                wrapper.workerThreadCreationFailure(t);
+            } finally {
+                incrementCurrentNumberOfThreads();
             }
         }
-
-        // The thread must be set to a daemon thread so the
-        // VM can exit if the only threads left are PooledThreads
-        // or other daemons.  We don't want to rely on the
-        // calling thread always being a daemon.
-
-        // Catch exceptions since setDaemon can cause a
-        // security exception to be thrown under netscape
-        // in the Applet mode
-        try {
-            thread.setDaemon(true);
-        } catch (Exception e) {
-            // REVISIT - need to do some logging here
-        }
-
-        thread.start();
     }
 
-    /**
-    * This method will return the minimum number of threads maintained
-    * by the threadpool.
-    */
     public int minimumNumberOfThreads() {
         return minWorkerThreads;
     }
 
-    /**
-    * This method will return the maximum number of threads in the
-    * threadpool at any point in time, for the life of the threadpool
-    */
     public int maximumNumberOfThreads() {
         return maxWorkerThreads;
     }
 
-    /**
-    * This method will return the time in milliseconds when idle
-    * threads in the threadpool are removed.
-    */
     public long idleTimeoutForThreads() {
         return inactivityTimeout;
     }
 
-    /**
-    * This method will return the total number of threads currently in the
-    * threadpool. This method returns a value which is not synchronized.
-    */
     public int currentNumberOfThreads() {
-        synchronized (lock) {
+        synchronized (workQueue) {
             return currentThreadCount;
         }
     }
 
-    /**
-    * This method will return the number of available threads in the
-    * threadpool which are waiting for work. This method returns a
-    * value which is not synchronized.
-    */
+    void decrementCurrentNumberOfThreads() {
+        synchronized (workQueue) {
+            currentThreadCount--;
+        }
+    }
+
+    void incrementCurrentNumberOfThreads() {
+        synchronized (workQueue) {
+            currentThreadCount++;
+        }
+    }
+
     public int numberOfAvailableThreads() {
-        synchronized (lock) {
+        synchronized (workQueue) {
             return availableWorkerThreads;
         }
     }
 
-    /**
-    * This method will return the number of busy threads in the threadpool
-    * This method returns a value which is not synchronized.
-    */
     public int numberOfBusyThreads() {
-        synchronized (lock) {
+        synchronized (workQueue) {
             return (currentThreadCount - availableWorkerThreads);
         }
     }
 
-    /**
-     * This method returns the average elapsed time taken to complete a Work
-     * item in milliseconds.
-     */
     public long averageWorkCompletionTime() {
-        synchronized (lock) {
-            return (totalTimeTaken / processedCount);
+        synchronized (workQueue) {
+            return (totalTimeTaken.get() / processedCount.get());
         }
     }
 
-    /**
-     * This method returns the number of Work items processed by the threadpool
-     */
     public long currentProcessedCount() {
-        synchronized (lock) {
-            return processedCount;
+        synchronized (workQueue) {
+            return processedCount.get();
         }
     }
 
@@ -357,15 +433,37 @@
 
 
     private static synchronized int getUniqueThreadId() {
-        return ThreadPoolImpl.threadCounter++;
+        return ThreadPoolImpl.threadCounter.incrementAndGet();
+    }
+
+    /**
+     * This method will decrement the number of available threads
+     * in the threadpool which are waiting for work. Called from
+     * WorkQueueImpl.requestWork()
+     */
+    void decrementNumberOfAvailableThreads() {
+        synchronized (workQueue) {
+            availableWorkerThreads--;
+        }
+    }
+
+    /**
+     * This method will increment the number of available threads
+     * in the threadpool which are waiting for work. Called from
+     * WorkQueueImpl.requestWork()
+     */
+    void incrementNumberOfAvailableThreads() {
+        synchronized (workQueue) {
+            availableWorkerThreads++;
+        }
     }
 
 
-    private class WorkerThread extends Thread
+    private class WorkerThread extends Thread implements Closeable
     {
         private Work currentWork;
         private int threadId = 0; // unique id for the thread
-        // thread pool this WorkerThread belongs too
+        private volatile boolean closeCalled = false;
         private String threadPoolName;
         // name seen by Thread.getName()
         private StringBuffer workerThreadName = new StringBuffer();
@@ -377,100 +475,61 @@
             setName(composeWorkerThreadName(threadPoolName, "Idle"));
         }
 
-        public void run() {
-            while (true) {
-                try {
+        public synchronized void close() {
+            closeCalled = true;
+            interrupt();
+        }
 
-                    synchronized (lock) {
-                        availableWorkerThreads++;
-                    }
+        private void resetClassLoader() {
 
-                    // Get some work to do
-                    currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);
+        }
 
-                    synchronized (lock) {
-                        availableWorkerThreads--;
-                        // It is possible in notifyForAvailableWork that the
-                        // check for availableWorkerThreads = 0 may return
-                        // false, because the availableWorkerThreads has not been
-                        // decremented to zero before the producer thread added
-                        // work to the queue. This may create a deadlock, if the
-                        // executing thread needs information which is in the work
-                        // item queued in the workqueue, but has no thread to work
-                        // on it since none was created because availableWorkerThreads = 0
-                        // returned false.
-                        // The following code will ensure that a thread is always available
-                        // in those situations
-                        if  ((availableWorkerThreads == 0) &&
-                                (workQueue.workItemsInQueue() > 0)) {
-                            createWorkerThread();
-                        }
-                    }
+        private void performWork() {
+            long start = System.currentTimeMillis();
+            try {
+                currentWork.doWork();
+            } catch (Throwable t) {
+                wrapper.workerThreadDoWorkThrowable(this, t);
+            }
+            long elapsedTime = System.currentTimeMillis() - start;
+            totalTimeTaken.addAndGet(elapsedTime);
+            processedCount.incrementAndGet();
+        }
 
-                    // Set the thread name for debugging.
-                    setName(composeWorkerThreadName(threadPoolName,
-                                      Integer.toString(this.threadId)));
-
-                    long start = System.currentTimeMillis();
+        public void run() {
+            try  {
+                while (!closeCalled) {
+                    try {
+                        currentWork = ((WorkQueueImpl)workQueue).requestWork(
+                            inactivityTimeout);
+                        if (currentWork == null)
+                            continue;
+                    } catch (InterruptedException exc) {
+                        wrapper.workQueueThreadInterrupted( exc, getName(),
+                           Boolean.valueOf(closeCalled));
 
-                    try {
-                        // Do the work
-                        currentWork.doWork();
+                        continue ;
                     } catch (Throwable t) {
-                        // Ignore all errors.
-                        ;
+                         wrapper.workerThreadThrowableFromRequestWork(this, t,
+                                workQueue.getName());
+
+                        continue;
                     }
 
-                    long end = System.currentTimeMillis();
-
-
-                    synchronized (lock) {
-                        totalTimeTaken += (end - start);
-                        processedCount++;
-                    }
+                    performWork();
 
                     // set currentWork to null so that the work item can be
-                    // garbage collected
+                    // garbage collected without waiting for the next work item.
                     currentWork = null;
 
-                    setName(composeWorkerThreadName(threadPoolName, "Idle"));
-
-                } catch (TimeoutException e) {
-                    // This thread timed out waiting for something to do.
-
-                    synchronized (lock) {
-                        availableWorkerThreads--;
-
-                        // This should for both bounded and unbounded case
-                        if (currentThreadCount > minWorkerThreads) {
-                            currentThreadCount--;
-                            // This thread can exit.
-                            return;
-                        } else {
-                            // Go back to waiting on workQueue
-                            continue;
-                        }
-                    }
-                } catch (InterruptedException ie) {
-                    // InterruptedExceptions are
-                    // caught here.  Thus, threads can be forced out of
-                    // requestWork and so they have to reacquire the lock.
-                    // Other options include ignoring or
-                    // letting this thread die.
-                    // Ignoring for now. REVISIT
-                    synchronized (lock) {
-                        availableWorkerThreads--;
-                    }
-
-                } catch (Throwable e) {
-
-                    // Ignore any exceptions that currentWork.process
-                    // accidently lets through, but let Errors pass.
-                    // Add debugging output?  REVISIT
-                    synchronized (lock) {
-                        availableWorkerThreads--;
-                    }
-
+                    resetClassLoader();
+                }
+            } catch (Throwable e) {
+                // This should not be possible
+                wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
+            } finally {
+                synchronized (workersLock) {
+                    workers.remove(this);
                 }
             }
         }