--- a/corba/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java Wed Jul 05 18:14:56 2017 +0200
+++ b/corba/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java Wed Jun 27 21:09:29 2012 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2003, 2004, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,18 @@
package com.sun.corba.se.impl.orbutil.threadpool;
+import java.io.IOException;
+import java.io.Closeable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
@@ -36,12 +48,27 @@
import com.sun.corba.se.spi.monitoring.MonitoringConstants;
import com.sun.corba.se.spi.monitoring.MonitoredObject;
import com.sun.corba.se.spi.monitoring.MonitoringFactories;
+import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
+import com.sun.corba.se.impl.logging.ORBUtilSystemException;
+import com.sun.corba.se.impl.orbutil.ORBConstants;
+import com.sun.corba.se.spi.logging.CORBALogDomains;
+
public class ThreadPoolImpl implements ThreadPool
{
- private static int threadCounter = 0; // serial counter useful for debugging
+ // serial counter useful for debugging
+ private static AtomicInteger threadCounter = new AtomicInteger(0);
+ private static final ORBUtilSystemException wrapper =
+ ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
+
+ // Any time currentThreadCount and/or availableWorkerThreads is updated
+ // or accessed this ThreadPool's WorkQueue must be locked. And, it is
+ // expected that this ThreadPool's WorkQueue is the only object that
+ // updates and accesses these values directly and indirectly though a
+ // call to a method in this ThreadPool. If any call to update or access
+ // those values must synchronized on this ThreadPool's WorkQueue.
private WorkQueue workQueue;
// Stores the number of available worker threads
@@ -65,14 +92,11 @@
// Running count of the work items processed
// Set the value to 1 so that divide by zero is avoided in
// averageWorkCompletionTime()
- private long processedCount = 1;
+ private AtomicLong processedCount = new AtomicLong(1);
// Running aggregate of the time taken in millis to execute work items
// processed by the threads in the threadpool
- private long totalTimeTaken = 0;
-
- // Lock for protecting state when required
- private Object lock = new Object();
+ private AtomicLong totalTimeTaken = new AtomicLong(0);
// Name of the ThreadPool
private String name;
@@ -81,7 +105,10 @@
private MonitoredObject threadpoolMonitoredObject;
// ThreadGroup in which threads should be created
- private ThreadGroup threadGroup ;
+ private ThreadGroup threadGroup;
+
+ Object workersLock = new Object();
+ List<WorkerThread> workers = new ArrayList<>();
/**
* This constructor is used to create an unbounded threadpool
@@ -90,7 +117,7 @@
inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
maxWorkerThreads = Integer.MAX_VALUE;
workQueue = new WorkQueueImpl(this);
- threadGroup = tg ;
+ threadGroup = tg;
name = threadpoolName;
initializeMonitoring();
}
@@ -121,6 +148,30 @@
initializeMonitoring();
}
+ // Note that this method should not return until AFTER all threads have died.
+ public void close() throws IOException {
+
+ // Copy to avoid concurrent modification problems.
+ List<WorkerThread> copy = null;
+ synchronized (workersLock) {
+ copy = new ArrayList<>(workers);
+ }
+
+ for (WorkerThread wt : copy) {
+ wt.close();
+ while (wt.getState() != Thread.State.TERMINATED) {
+ try {
+ wt.join();
+ } catch (InterruptedException exc) {
+ wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
+ }
+ }
+ }
+
+ threadGroup = null;
+ }
+
+
// Setup monitoring for this threadpool
private void initializeMonitoring() {
// Get root monitored object
@@ -217,8 +268,8 @@
* or notify waiting threads on the queue for available work
*/
void notifyForAvailableWork(WorkQueue aWorkQueue) {
- synchronized (lock) {
- if (availableWorkerThreads == 0) {
+ synchronized (aWorkQueue) {
+ if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
createWorkerThread();
} else {
aWorkQueue.notify();
@@ -227,120 +278,145 @@
}
+ private Thread createWorkerThreadHelper( String name ) {
+ // Thread creation needs to be in a doPrivileged block
+ // if there is a non-null security manager for two reasons:
+ // 1. The creation of a thread in a specific ThreadGroup
+ // is a privileged operation. Lack of a doPrivileged
+ // block here causes an AccessControlException
+ // (see bug 6268145).
+ // 2. We want to make sure that the permissions associated
+ // with this thread do NOT include the permissions of
+ // the current thread that is calling this method.
+ // This leads to problems in the app server where
+ // some threads in the ThreadPool randomly get
+ // bad permissions, leading to unpredictable
+ // permission errors (see bug 6021011).
+ //
+ // A Java thread contains a stack of call frames,
+ // one for each method called that has not yet returned.
+ // Each method comes from a particular class. The class
+ // was loaded by a ClassLoader which has an associated
+ // CodeSource, and this determines the Permissions
+ // for all methods in that class. The current
+ // Permissions for the thread are the intersection of
+ // all Permissions for the methods on the stack.
+ // This is part of the Security Context of the thread.
+ //
+ // When a thread creates a new thread, the new thread
+ // inherits the security context of the old thread.
+ // This is bad in a ThreadPool, because different
+ // creators of threads may have different security contexts.
+ // This leads to occasional unpredictable errors when
+ // a thread is re-used in a different security context.
+ //
+ // Avoiding this problem is simple: just do the thread
+ // creation in a doPrivileged block. This sets the
+ // inherited security context to that of the code source
+ // for the ORB code itself, which contains all permissions
+ // in either Java SE or Java EE.
+ WorkerThread thread = new WorkerThread(threadGroup, name);
+ synchronized (workersLock) {
+ workers.add(thread);
+ }
+
+ // The thread must be set to a daemon thread so the
+ // VM can exit if the only threads left are PooledThreads
+ // or other daemons. We don't want to rely on the
+ // calling thread always being a daemon.
+ // Note that no exception is possible here since we
+ // are inside the doPrivileged block.
+ thread.setDaemon(true);
+
+ wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
+
+ thread.start();
+ return null;
+ }
+
+
/**
* To be called from the workqueue to create worker threads when none
* available.
*/
void createWorkerThread() {
- WorkerThread thread;
-
- synchronized (lock) {
- if (boundedThreadPool) {
- if (currentThreadCount < maxWorkerThreads) {
- thread = new WorkerThread(threadGroup, getName());
- currentThreadCount++;
+ final String name = getName();
+ synchronized (workQueue) {
+ try {
+ if (System.getSecurityManager() == null) {
+ createWorkerThreadHelper(name);
} else {
- // REVIST - Need to create a thread to monitor the
- // the state for deadlock i.e. all threads waiting for
- // something which can be got from the item in the
- // workqueue, but there is no thread available to
- // process that work item - DEADLOCK !!
- return;
+ // If we get here, we need to create a thread.
+ AccessController.doPrivileged(
+ new PrivilegedAction() {
+ public Object run() {
+ return createWorkerThreadHelper(name);
+ }
+ }
+ );
}
- } else {
- thread = new WorkerThread(threadGroup, getName());
- currentThreadCount++;
+ } catch (Throwable t) {
+ // Decrementing the count of current worker threads.
+ // But, it will be increased in the finally block.
+ decrementCurrentNumberOfThreads();
+ wrapper.workerThreadCreationFailure(t);
+ } finally {
+ incrementCurrentNumberOfThreads();
}
}
-
- // The thread must be set to a daemon thread so the
- // VM can exit if the only threads left are PooledThreads
- // or other daemons. We don't want to rely on the
- // calling thread always being a daemon.
-
- // Catch exceptions since setDaemon can cause a
- // security exception to be thrown under netscape
- // in the Applet mode
- try {
- thread.setDaemon(true);
- } catch (Exception e) {
- // REVISIT - need to do some logging here
- }
-
- thread.start();
}
- /**
- * This method will return the minimum number of threads maintained
- * by the threadpool.
- */
public int minimumNumberOfThreads() {
return minWorkerThreads;
}
- /**
- * This method will return the maximum number of threads in the
- * threadpool at any point in time, for the life of the threadpool
- */
public int maximumNumberOfThreads() {
return maxWorkerThreads;
}
- /**
- * This method will return the time in milliseconds when idle
- * threads in the threadpool are removed.
- */
public long idleTimeoutForThreads() {
return inactivityTimeout;
}
- /**
- * This method will return the total number of threads currently in the
- * threadpool. This method returns a value which is not synchronized.
- */
public int currentNumberOfThreads() {
- synchronized (lock) {
+ synchronized (workQueue) {
return currentThreadCount;
}
}
- /**
- * This method will return the number of available threads in the
- * threadpool which are waiting for work. This method returns a
- * value which is not synchronized.
- */
+ void decrementCurrentNumberOfThreads() {
+ synchronized (workQueue) {
+ currentThreadCount--;
+ }
+ }
+
+ void incrementCurrentNumberOfThreads() {
+ synchronized (workQueue) {
+ currentThreadCount++;
+ }
+ }
+
public int numberOfAvailableThreads() {
- synchronized (lock) {
+ synchronized (workQueue) {
return availableWorkerThreads;
}
}
- /**
- * This method will return the number of busy threads in the threadpool
- * This method returns a value which is not synchronized.
- */
public int numberOfBusyThreads() {
- synchronized (lock) {
+ synchronized (workQueue) {
return (currentThreadCount - availableWorkerThreads);
}
}
- /**
- * This method returns the average elapsed time taken to complete a Work
- * item in milliseconds.
- */
public long averageWorkCompletionTime() {
- synchronized (lock) {
- return (totalTimeTaken / processedCount);
+ synchronized (workQueue) {
+ return (totalTimeTaken.get() / processedCount.get());
}
}
- /**
- * This method returns the number of Work items processed by the threadpool
- */
public long currentProcessedCount() {
- synchronized (lock) {
- return processedCount;
+ synchronized (workQueue) {
+ return processedCount.get();
}
}
@@ -357,15 +433,37 @@
private static synchronized int getUniqueThreadId() {
- return ThreadPoolImpl.threadCounter++;
+ return ThreadPoolImpl.threadCounter.incrementAndGet();
+ }
+
+ /**
+ * This method will decrement the number of available threads
+ * in the threadpool which are waiting for work. Called from
+ * WorkQueueImpl.requestWork()
+ */
+ void decrementNumberOfAvailableThreads() {
+ synchronized (workQueue) {
+ availableWorkerThreads--;
+ }
+ }
+
+ /**
+ * This method will increment the number of available threads
+ * in the threadpool which are waiting for work. Called from
+ * WorkQueueImpl.requestWork()
+ */
+ void incrementNumberOfAvailableThreads() {
+ synchronized (workQueue) {
+ availableWorkerThreads++;
+ }
}
- private class WorkerThread extends Thread
+ private class WorkerThread extends Thread implements Closeable
{
private Work currentWork;
private int threadId = 0; // unique id for the thread
- // thread pool this WorkerThread belongs too
+ private volatile boolean closeCalled = false;
private String threadPoolName;
// name seen by Thread.getName()
private StringBuffer workerThreadName = new StringBuffer();
@@ -377,100 +475,61 @@
setName(composeWorkerThreadName(threadPoolName, "Idle"));
}
- public void run() {
- while (true) {
- try {
+ public synchronized void close() {
+ closeCalled = true;
+ interrupt();
+ }
- synchronized (lock) {
- availableWorkerThreads++;
- }
+ private void resetClassLoader() {
- // Get some work to do
- currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);
+ }
- synchronized (lock) {
- availableWorkerThreads--;
- // It is possible in notifyForAvailableWork that the
- // check for availableWorkerThreads = 0 may return
- // false, because the availableWorkerThreads has not been
- // decremented to zero before the producer thread added
- // work to the queue. This may create a deadlock, if the
- // executing thread needs information which is in the work
- // item queued in the workqueue, but has no thread to work
- // on it since none was created because availableWorkerThreads = 0
- // returned false.
- // The following code will ensure that a thread is always available
- // in those situations
- if ((availableWorkerThreads == 0) &&
- (workQueue.workItemsInQueue() > 0)) {
- createWorkerThread();
- }
- }
+ private void performWork() {
+ long start = System.currentTimeMillis();
+ try {
+ currentWork.doWork();
+ } catch (Throwable t) {
+ wrapper.workerThreadDoWorkThrowable(this, t);
+ }
+ long elapsedTime = System.currentTimeMillis() - start;
+ totalTimeTaken.addAndGet(elapsedTime);
+ processedCount.incrementAndGet();
+ }
- // Set the thread name for debugging.
- setName(composeWorkerThreadName(threadPoolName,
- Integer.toString(this.threadId)));
-
- long start = System.currentTimeMillis();
+ public void run() {
+ try {
+ while (!closeCalled) {
+ try {
+ currentWork = ((WorkQueueImpl)workQueue).requestWork(
+ inactivityTimeout);
+ if (currentWork == null)
+ continue;
+ } catch (InterruptedException exc) {
+ wrapper.workQueueThreadInterrupted( exc, getName(),
+ Boolean.valueOf(closeCalled));
- try {
- // Do the work
- currentWork.doWork();
+ continue ;
} catch (Throwable t) {
- // Ignore all errors.
- ;
+ wrapper.workerThreadThrowableFromRequestWork(this, t,
+ workQueue.getName());
+
+ continue;
}
- long end = System.currentTimeMillis();
-
-
- synchronized (lock) {
- totalTimeTaken += (end - start);
- processedCount++;
- }
+ performWork();
// set currentWork to null so that the work item can be
- // garbage collected
+ // garbage collected without waiting for the next work item.
currentWork = null;
- setName(composeWorkerThreadName(threadPoolName, "Idle"));
-
- } catch (TimeoutException e) {
- // This thread timed out waiting for something to do.
-
- synchronized (lock) {
- availableWorkerThreads--;
-
- // This should for both bounded and unbounded case
- if (currentThreadCount > minWorkerThreads) {
- currentThreadCount--;
- // This thread can exit.
- return;
- } else {
- // Go back to waiting on workQueue
- continue;
- }
- }
- } catch (InterruptedException ie) {
- // InterruptedExceptions are
- // caught here. Thus, threads can be forced out of
- // requestWork and so they have to reacquire the lock.
- // Other options include ignoring or
- // letting this thread die.
- // Ignoring for now. REVISIT
- synchronized (lock) {
- availableWorkerThreads--;
- }
-
- } catch (Throwable e) {
-
- // Ignore any exceptions that currentWork.process
- // accidently lets through, but let Errors pass.
- // Add debugging output? REVISIT
- synchronized (lock) {
- availableWorkerThreads--;
- }
-
+ resetClassLoader();
+ }
+ } catch (Throwable e) {
+ // This should not be possible
+ wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
+ } finally {
+ synchronized (workersLock) {
+ workers.remove(this);
}
}
}