6747411: EventClient causes thread leaks
Summary: Reworked thread management in EventClient and related classes.
Reviewed-by: sjiang, dfuchs
--- a/jdk/src/share/classes/com/sun/jmx/event/LeaseManager.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/event/LeaseManager.java Fri Sep 12 15:17:52 2008 +0200
@@ -27,7 +27,6 @@
import com.sun.jmx.remote.util.ClassLogger;
import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -115,6 +114,7 @@
scheduled = null;
}
callback.run();
+ executor.shutdown();
}
}
@@ -131,6 +131,13 @@
logger.trace("stop", "canceling lease");
scheduled.cancel(false);
scheduled = null;
+ try {
+ executor.shutdown();
+ } catch (SecurityException e) {
+ // OK: caller doesn't have RuntimePermission("modifyThread")
+ // which is unlikely in reality but triggers a test failure otherwise
+ logger.trace("stop", "exception from executor.shutdown", e);
+ }
}
private final Runnable callback;
@@ -138,7 +145,7 @@
private final ScheduledExecutorService executor
= Executors.newScheduledThreadPool(1,
- new DaemonThreadFactory("LeaseManager"));
+ new DaemonThreadFactory("JMX LeaseManager %d"));
private static final ClassLogger logger =
new ClassLogger("javax.management.event", "LeaseManager");
--- a/jdk/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java Fri Sep 12 15:17:52 2008 +0200
@@ -95,7 +95,9 @@
executor.execute(this);
} catch (RejectedExecutionException e) {
logger.warning(
- "setEventReceiver", "Executor threw exception", e);
+ "execute",
+ "Executor threw exception (" + this.getClass().getName() + ")",
+ e);
throw new RejectedExecutionException(
"Executor.execute threw exception -" +
"should not be possible", e);
--- a/jdk/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java Fri Sep 12 15:17:52 2008 +0200
@@ -32,13 +32,15 @@
import com.sun.jmx.remote.util.EnvHelp;
public abstract class ClientCommunicatorAdmin {
+ private static volatile long threadNo = 1;
+
public ClientCommunicatorAdmin(long period) {
this.period = period;
if (period > 0) {
checker = new Checker();
- Thread t = new Thread(checker);
+ Thread t = new Thread(checker, "JMX client heartbeat " + ++threadNo);
t.setDaemon(true);
t.start();
} else
--- a/jdk/src/share/classes/javax/management/event/EventClient.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/EventClient.java Fri Sep 12 15:17:52 2008 +0200
@@ -264,11 +264,12 @@
new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
- "EventClient lease renewer %d");
+ "JMX EventClient lease renewer %d");
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
20, daemonThreadFactory);
- exec.setKeepAliveTime(3, TimeUnit.SECONDS);
+ exec.setKeepAliveTime(1, TimeUnit.SECONDS);
exec.allowCoreThreadTimeOut(true);
+ exec.setRemoveOnCancelPolicy(true);
return exec;
}
};
--- a/jdk/src/share/classes/javax/management/event/FetchingEventRelay.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/FetchingEventRelay.java Fri Sep 12 15:17:52 2008 +0200
@@ -31,10 +31,8 @@
import java.io.IOException;
import java.io.NotSerializableException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanException;
@@ -215,50 +213,47 @@
this.maxNotifs = maxNotifs;
if (executor == null) {
- executor = Executors.newSingleThreadScheduledExecutor(
+ ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1,
daemonThreadFactory);
- }
+ stpe.setKeepAliveTime(1, TimeUnit.SECONDS);
+ stpe.allowCoreThreadTimeOut(true);
+ executor = stpe;
+ this.defaultExecutor = stpe;
+ } else
+ this.defaultExecutor = null;
this.executor = executor;
- if (executor instanceof ScheduledExecutorService)
- leaseScheduler = (ScheduledExecutorService) executor;
- else {
- leaseScheduler = Executors.newSingleThreadScheduledExecutor(
- daemonThreadFactory);
- }
startSequenceNumber = 0;
fetchingJob = new MyJob();
}
- public void setEventReceiver(EventReceiver eventReceiver) {
+ public synchronized void setEventReceiver(EventReceiver eventReceiver) {
if (logger.traceOn()) {
logger.trace("setEventReceiver", ""+eventReceiver);
}
EventReceiver old = this.eventReceiver;
- synchronized(fetchingJob) {
- this.eventReceiver = eventReceiver;
- if (old == null && eventReceiver != null)
- fetchingJob.resume();
- }
+ this.eventReceiver = eventReceiver;
+ if (old == null && eventReceiver != null)
+ fetchingJob.resume();
}
public String getClientId() {
return clientId;
}
- public void stop() {
+ public synchronized void stop() {
if (logger.traceOn()) {
logger.trace("stop", "");
}
- synchronized(fetchingJob) {
- if (stopped) {
- return;
- }
+ if (stopped) {
+ return;
+ }
- stopped = true;
- clientId = null;
- }
+ stopped = true;
+ clientId = null;
+ if (defaultExecutor != null)
+ defaultExecutor.shutdown();
}
private class MyJob extends RepeatedSingletonJob {
@@ -372,10 +367,9 @@
private final EventClientDelegateMBean delegate;
private String clientId;
private boolean stopped = false;
- private volatile ScheduledFuture<?> leaseRenewalFuture;
private final Executor executor;
- private final ScheduledExecutorService leaseScheduler;
+ private final ExecutorService defaultExecutor;
private final MyJob fetchingJob;
private final long timeout;
@@ -385,5 +379,5 @@
new ClassLogger("javax.management.event",
"FetchingEventRelay");
private static final ThreadFactory daemonThreadFactory =
- new DaemonThreadFactory("FetchingEventRelay-executor");
+ new DaemonThreadFactory("JMX FetchingEventRelay executor %d");
}
--- a/jdk/src/share/classes/javax/management/event/RMIPushEventForwarder.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/RMIPushEventForwarder.java Fri Sep 12 15:17:52 2008 +0200
@@ -185,7 +185,7 @@
private static final ExecutorService executor =
Executors.newCachedThreadPool(
- new DaemonThreadFactory("RMIEventForwarder Executor"));
+ new DaemonThreadFactory("JMX RMIEventForwarder Executor"));
private final SendingJob sendingJob = new SendingJob();
private final BlockingQueue<TargetedNotification> buffer;
--- a/jdk/src/share/classes/javax/management/remote/rmi/RMIConnector.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/remote/rmi/RMIConnector.java Fri Sep 12 15:17:52 2008 +0200
@@ -420,7 +420,7 @@
new PerThreadGroupPool.Create<ThreadPoolExecutor>() {
public ThreadPoolExecutor createThreadPool(ThreadGroup group) {
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
- "RMIConnector listener dispatch %d");
+ "JMX RMIConnector listener dispatch %d");
ThreadPoolExecutor exec = new ThreadPoolExecutor(
1, 10, 1, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(),
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/javax/management/eventService/EventClientThreadTest.java Fri Sep 12 15:17:52 2008 +0200
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * @test
+ * @bug 6747411
+ * @summary Check that EventClient instances don't leak threads.
+ * @author Eamonn McManus
+ */
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.management.event.EventClient;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+public class EventClientThreadTest {
+ private static final int MAX_TIME_SECONDS = 20;
+
+ private static final BlockingQueue<Notification> queue =
+ new ArrayBlockingQueue(100);
+
+ private static final NotificationListener queueListener =
+ new NotificationListener() {
+ public void handleNotification(Notification notification,
+ Object handback) {
+ queue.add(notification);
+ }
+ };
+
+ private static final NotificationFilter dummyFilter =
+ new NotificationFilter() {
+ public boolean isNotificationEnabled(Notification notification) {
+ return true;
+ }
+ };
+
+ public static void main(String[] args) throws Exception {
+ long start = System.currentTimeMillis();
+ long deadline = start + MAX_TIME_SECONDS * 1000;
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://");
+ JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
+ url, null, mbs);
+ cs.start();
+ JMXServiceURL addr = cs.getAddress();
+ JMXConnector cc = JMXConnectorFactory.connect(addr);
+ MBeanServerConnection mbsc = cc.getMBeanServerConnection();
+
+ ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+
+ System.out.println("Opening and closing some EventClients...");
+ // If we create a connection, then create and destroy EventClients
+ // over it, then close it, there should be no "JMX *" threads left.
+ for (int i = 0; i < 5; i++)
+ test(mbsc);
+
+ cc.close();
+
+ showTime("opening and closing initial EventClients", start);
+
+ Set<String> jmxThreads = threadsMatching("JMX .*");
+ while (!jmxThreads.isEmpty() && System.currentTimeMillis() < deadline) {
+ Set<String> jmxThreadsNow = threadsMatching("JMX .*");
+ Set<String> gone = new TreeSet<String>(jmxThreads);
+ gone.removeAll(jmxThreadsNow);
+ for (String s : gone)
+ showTime("expiry of \"" + s + "\"", start);
+ jmxThreads = jmxThreadsNow;
+ Thread.sleep(10);
+ }
+ if (System.currentTimeMillis() >= deadline) {
+ showThreads(threads);
+ throw new Exception("Timed out waiting for JMX threads to expire");
+ }
+
+ showTime("waiting for JMX threads to expire", start);
+
+ System.out.println("TEST PASSED");
+ }
+
+ static void showThreads(ThreadMXBean threads) throws Exception {
+ long[] ids = threads.getAllThreadIds();
+ for (long id : ids) {
+ ThreadInfo ti = threads.getThreadInfo(id);
+ String name = (ti == null) ? "(defunct)" : ti.getThreadName();
+ System.out.printf("%4d %s\n", id, name);
+ }
+ }
+
+ static void showTime(String what, long start) {
+ long elapsed = System.currentTimeMillis() - start;
+ System.out.printf("Time after %s: %.3f s\n", what, elapsed / 1000.0);
+ }
+
+ static Set<String> threadsMatching(String pattern) {
+ Set<String> matching = new TreeSet<String>();
+ ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+ long[] ids = threads.getAllThreadIds();
+ for (long id : ids) {
+ ThreadInfo ti = threads.getThreadInfo(id);
+ String name = (ti == null) ? "(defunct)" : ti.getThreadName();
+ if (name.matches(pattern))
+ matching.add(name);
+ }
+ return matching;
+ }
+
+ static void test(MBeanServerConnection mbsc) throws Exception {
+ final ObjectName delegateName = MBeanServerDelegate.DELEGATE_NAME;
+ final ObjectName testName = new ObjectName("test:type=Test");
+ EventClient ec = new EventClient(mbsc);
+ ec.addNotificationListener(delegateName, queueListener, null, null);
+ mbsc.createMBean(MBeanServerDelegate.class.getName(), testName);
+ mbsc.unregisterMBean(testName);
+ final String[] expectedTypes = {
+ MBeanServerNotification.REGISTRATION_NOTIFICATION,
+ MBeanServerNotification.UNREGISTRATION_NOTIFICATION,
+ };
+ for (String s : expectedTypes) {
+ Notification n = queue.poll(3, TimeUnit.SECONDS);
+ if (n == null)
+ throw new Exception("Timed out waiting for notif: " + s);
+ if (!(n instanceof MBeanServerNotification))
+ throw new Exception("Got notif of wrong class: " + n.getClass());
+ if (!n.getType().equals(s)) {
+ throw new Exception("Got notif of wrong type: " + n.getType() +
+ " (expecting " + s + ")");
+ }
+ }
+ ec.removeNotificationListener(delegateName, queueListener);
+
+ ec.addNotificationListener(delegateName, queueListener, dummyFilter, "foo");
+ ec.removeNotificationListener(delegateName, queueListener, dummyFilter, "foo");
+
+ ec.close();
+ }
+}
\ No newline at end of file
--- a/jdk/test/javax/management/eventService/SharingThreadTest.java Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/test/javax/management/eventService/SharingThreadTest.java Fri Sep 12 15:17:52 2008 +0200
@@ -1,4 +1,4 @@
-/*/*
+/*
* Copyright 2007 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*