6747411: EventClient causes thread leaks
authoremcmanus
Fri, 12 Sep 2008 15:17:52 +0200
changeset 1229 8d219f46ab69
parent 1228 1515928f48cd
child 1230 18db753e5986
6747411: EventClient causes thread leaks Summary: Reworked thread management in EventClient and related classes. Reviewed-by: sjiang, dfuchs
jdk/src/share/classes/com/sun/jmx/event/LeaseManager.java
jdk/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java
jdk/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java
jdk/src/share/classes/javax/management/event/EventClient.java
jdk/src/share/classes/javax/management/event/FetchingEventRelay.java
jdk/src/share/classes/javax/management/event/RMIPushEventForwarder.java
jdk/src/share/classes/javax/management/remote/rmi/RMIConnector.java
jdk/test/javax/management/eventService/EventClientThreadTest.java
jdk/test/javax/management/eventService/SharingThreadTest.java
--- a/jdk/src/share/classes/com/sun/jmx/event/LeaseManager.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/event/LeaseManager.java	Fri Sep 12 15:17:52 2008 +0200
@@ -27,7 +27,6 @@
 
 import com.sun.jmx.remote.util.ClassLogger;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -115,6 +114,7 @@
                 scheduled = null;
             }
             callback.run();
+            executor.shutdown();
         }
     }
 
@@ -131,6 +131,13 @@
         logger.trace("stop", "canceling lease");
         scheduled.cancel(false);
         scheduled = null;
+        try {
+            executor.shutdown();
+        } catch (SecurityException e) {
+            // OK: caller doesn't have RuntimePermission("modifyThread")
+            // which is unlikely in reality but triggers a test failure otherwise
+            logger.trace("stop", "exception from executor.shutdown", e);
+        }
     }
 
     private final Runnable callback;
@@ -138,7 +145,7 @@
 
     private final ScheduledExecutorService executor
             = Executors.newScheduledThreadPool(1,
-            new DaemonThreadFactory("LeaseManager"));
+            new DaemonThreadFactory("JMX LeaseManager %d"));
 
     private static final ClassLogger logger =
             new ClassLogger("javax.management.event", "LeaseManager");
--- a/jdk/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java	Fri Sep 12 15:17:52 2008 +0200
@@ -95,7 +95,9 @@
             executor.execute(this);
         } catch (RejectedExecutionException e) {
             logger.warning(
-                    "setEventReceiver", "Executor threw exception", e);
+                    "execute",
+                    "Executor threw exception (" + this.getClass().getName() + ")",
+                    e);
             throw new RejectedExecutionException(
                     "Executor.execute threw exception -" +
                     "should not be possible", e);
--- a/jdk/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java	Fri Sep 12 15:17:52 2008 +0200
@@ -32,13 +32,15 @@
 import com.sun.jmx.remote.util.EnvHelp;
 
 public abstract class ClientCommunicatorAdmin {
+    private static volatile long threadNo = 1;
+
     public ClientCommunicatorAdmin(long period) {
         this.period = period;
 
         if (period > 0) {
             checker = new Checker();
 
-            Thread t = new Thread(checker);
+            Thread t = new Thread(checker, "JMX client heartbeat " + ++threadNo);
             t.setDaemon(true);
             t.start();
         } else
--- a/jdk/src/share/classes/javax/management/event/EventClient.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/EventClient.java	Fri Sep 12 15:17:52 2008 +0200
@@ -264,11 +264,12 @@
                 new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
             public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
                 ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
-                        "EventClient lease renewer %d");
+                        "JMX EventClient lease renewer %d");
                 ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
                         20, daemonThreadFactory);
-                exec.setKeepAliveTime(3, TimeUnit.SECONDS);
+                exec.setKeepAliveTime(1, TimeUnit.SECONDS);
                 exec.allowCoreThreadTimeOut(true);
+                exec.setRemoveOnCancelPolicy(true);
                 return exec;
             }
         };
--- a/jdk/src/share/classes/javax/management/event/FetchingEventRelay.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/FetchingEventRelay.java	Fri Sep 12 15:17:52 2008 +0200
@@ -31,10 +31,8 @@
 import java.io.IOException;
 import java.io.NotSerializableException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanException;
@@ -215,50 +213,47 @@
         this.maxNotifs = maxNotifs;
 
         if (executor == null) {
-            executor = Executors.newSingleThreadScheduledExecutor(
+            ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1,
                     daemonThreadFactory);
-        }
+            stpe.setKeepAliveTime(1, TimeUnit.SECONDS);
+            stpe.allowCoreThreadTimeOut(true);
+            executor = stpe;
+            this.defaultExecutor = stpe;
+        } else
+            this.defaultExecutor = null;
         this.executor = executor;
-        if (executor instanceof ScheduledExecutorService)
-            leaseScheduler = (ScheduledExecutorService) executor;
-        else {
-            leaseScheduler = Executors.newSingleThreadScheduledExecutor(
-                    daemonThreadFactory);
-        }
 
         startSequenceNumber = 0;
         fetchingJob = new MyJob();
     }
 
-    public void setEventReceiver(EventReceiver eventReceiver) {
+    public synchronized void setEventReceiver(EventReceiver eventReceiver) {
         if (logger.traceOn()) {
             logger.trace("setEventReceiver", ""+eventReceiver);
         }
 
         EventReceiver old = this.eventReceiver;
-        synchronized(fetchingJob) {
-            this.eventReceiver = eventReceiver;
-            if (old == null && eventReceiver != null)
-                fetchingJob.resume();
-        }
+        this.eventReceiver = eventReceiver;
+        if (old == null && eventReceiver != null)
+            fetchingJob.resume();
     }
 
     public String getClientId() {
         return clientId;
     }
 
-    public void stop() {
+    public synchronized void stop() {
         if (logger.traceOn()) {
             logger.trace("stop", "");
         }
-        synchronized(fetchingJob) {
-            if (stopped) {
-                return;
-            }
+        if (stopped) {
+            return;
+        }
 
-            stopped = true;
-            clientId = null;
-        }
+        stopped = true;
+        clientId = null;
+        if (defaultExecutor != null)
+            defaultExecutor.shutdown();
     }
 
     private class MyJob extends RepeatedSingletonJob {
@@ -372,10 +367,9 @@
     private final EventClientDelegateMBean delegate;
     private String clientId;
     private boolean stopped = false;
-    private volatile ScheduledFuture<?> leaseRenewalFuture;
 
     private final Executor executor;
-    private final ScheduledExecutorService leaseScheduler;
+    private final ExecutorService defaultExecutor;
     private final MyJob fetchingJob;
 
     private final long timeout;
@@ -385,5 +379,5 @@
             new ClassLogger("javax.management.event",
             "FetchingEventRelay");
     private static final ThreadFactory daemonThreadFactory =
-                    new DaemonThreadFactory("FetchingEventRelay-executor");
+                    new DaemonThreadFactory("JMX FetchingEventRelay executor %d");
 }
--- a/jdk/src/share/classes/javax/management/event/RMIPushEventForwarder.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/event/RMIPushEventForwarder.java	Fri Sep 12 15:17:52 2008 +0200
@@ -185,7 +185,7 @@
 
     private static final ExecutorService executor =
             Executors.newCachedThreadPool(
-            new DaemonThreadFactory("RMIEventForwarder Executor"));
+            new DaemonThreadFactory("JMX RMIEventForwarder Executor"));
     private final SendingJob sendingJob = new SendingJob();
 
     private final BlockingQueue<TargetedNotification> buffer;
--- a/jdk/src/share/classes/javax/management/remote/rmi/RMIConnector.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/src/share/classes/javax/management/remote/rmi/RMIConnector.java	Fri Sep 12 15:17:52 2008 +0200
@@ -420,7 +420,7 @@
                 new PerThreadGroupPool.Create<ThreadPoolExecutor>() {
             public ThreadPoolExecutor createThreadPool(ThreadGroup group) {
                 ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
-                        "RMIConnector listener dispatch %d");
+                        "JMX RMIConnector listener dispatch %d");
                 ThreadPoolExecutor exec = new ThreadPoolExecutor(
                         1, 10, 1, TimeUnit.SECONDS,
                         new LinkedBlockingDeque<Runnable>(),
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/javax/management/eventService/EventClientThreadTest.java	Fri Sep 12 15:17:52 2008 +0200
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2008 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * @test
+ * @bug 6747411
+ * @summary Check that EventClient instances don't leak threads.
+ * @author Eamonn McManus
+ */
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.management.event.EventClient;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+public class EventClientThreadTest {
+    private static final int MAX_TIME_SECONDS = 20;
+
+    private static final BlockingQueue<Notification> queue =
+            new ArrayBlockingQueue(100);
+
+    private static final NotificationListener queueListener =
+            new NotificationListener() {
+        public void handleNotification(Notification notification,
+                                       Object handback) {
+            queue.add(notification);
+        }
+    };
+
+    private static final NotificationFilter dummyFilter =
+            new NotificationFilter() {
+        public boolean isNotificationEnabled(Notification notification) {
+            return true;
+        }
+    };
+
+    public static void main(String[] args) throws Exception {
+        long start = System.currentTimeMillis();
+        long deadline = start + MAX_TIME_SECONDS * 1000;
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://");
+        JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
+                url, null, mbs);
+        cs.start();
+        JMXServiceURL addr = cs.getAddress();
+        JMXConnector cc = JMXConnectorFactory.connect(addr);
+        MBeanServerConnection mbsc = cc.getMBeanServerConnection();
+
+        ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+
+        System.out.println("Opening and closing some EventClients...");
+        // If we create a connection, then create and destroy EventClients
+        // over it, then close it, there should be no "JMX *" threads left.
+        for (int i = 0; i < 5; i++)
+            test(mbsc);
+
+        cc.close();
+
+        showTime("opening and closing initial EventClients", start);
+
+        Set<String> jmxThreads = threadsMatching("JMX .*");
+        while (!jmxThreads.isEmpty() && System.currentTimeMillis() < deadline) {
+            Set<String> jmxThreadsNow = threadsMatching("JMX .*");
+            Set<String> gone = new TreeSet<String>(jmxThreads);
+            gone.removeAll(jmxThreadsNow);
+            for (String s : gone)
+                showTime("expiry of \"" + s + "\"", start);
+            jmxThreads = jmxThreadsNow;
+            Thread.sleep(10);
+        }
+        if (System.currentTimeMillis() >= deadline) {
+            showThreads(threads);
+            throw new Exception("Timed out waiting for JMX threads to expire");
+        }
+
+        showTime("waiting for JMX threads to expire", start);
+
+        System.out.println("TEST PASSED");
+    }
+
+    static void showThreads(ThreadMXBean threads) throws Exception {
+        long[] ids = threads.getAllThreadIds();
+        for (long id : ids) {
+            ThreadInfo ti = threads.getThreadInfo(id);
+            String name = (ti == null) ? "(defunct)" : ti.getThreadName();
+            System.out.printf("%4d %s\n", id, name);
+        }
+    }
+
+    static void showTime(String what, long start) {
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.printf("Time after %s: %.3f s\n", what, elapsed / 1000.0);
+    }
+
+    static Set<String> threadsMatching(String pattern) {
+        Set<String> matching = new TreeSet<String>();
+        ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+        long[] ids = threads.getAllThreadIds();
+        for (long id : ids) {
+            ThreadInfo ti = threads.getThreadInfo(id);
+            String name = (ti == null) ? "(defunct)" : ti.getThreadName();
+            if (name.matches(pattern))
+                matching.add(name);
+        }
+        return matching;
+    }
+
+    static void test(MBeanServerConnection mbsc) throws Exception {
+        final ObjectName delegateName = MBeanServerDelegate.DELEGATE_NAME;
+        final ObjectName testName = new ObjectName("test:type=Test");
+        EventClient ec = new EventClient(mbsc);
+        ec.addNotificationListener(delegateName, queueListener, null, null);
+        mbsc.createMBean(MBeanServerDelegate.class.getName(), testName);
+        mbsc.unregisterMBean(testName);
+        final String[] expectedTypes = {
+            MBeanServerNotification.REGISTRATION_NOTIFICATION,
+            MBeanServerNotification.UNREGISTRATION_NOTIFICATION,
+        };
+        for (String s : expectedTypes) {
+            Notification n = queue.poll(3, TimeUnit.SECONDS);
+            if (n == null)
+                throw new Exception("Timed out waiting for notif: " + s);
+            if (!(n instanceof MBeanServerNotification))
+                throw new Exception("Got notif of wrong class: " + n.getClass());
+            if (!n.getType().equals(s)) {
+                throw new Exception("Got notif of wrong type: " + n.getType() +
+                        " (expecting " + s + ")");
+            }
+        }
+        ec.removeNotificationListener(delegateName, queueListener);
+
+        ec.addNotificationListener(delegateName, queueListener, dummyFilter, "foo");
+        ec.removeNotificationListener(delegateName, queueListener, dummyFilter, "foo");
+
+        ec.close();
+    }
+}
\ No newline at end of file
--- a/jdk/test/javax/management/eventService/SharingThreadTest.java	Thu Sep 11 14:58:57 2008 -0700
+++ b/jdk/test/javax/management/eventService/SharingThreadTest.java	Fri Sep 12 15:17:52 2008 +0200
@@ -1,4 +1,4 @@
-/*/*
+/*
  * Copyright 2007 Sun Microsystems, Inc.  All Rights Reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *