6754988: Update copyright year
Summary: Update for files that have been modified starting July 2008
Reviewed-by: ohair, tbell
/*
* Copyright 2007-2008 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Sun designates this
* particular file as subject to the "Classpath" exception as provided
* by Sun in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
package javax.management.event;
import com.sun.jmx.event.DaemonThreadFactory;
import com.sun.jmx.event.LeaseRenewer;
import com.sun.jmx.event.ReceiverBuffer;
import com.sun.jmx.event.RepeatedSingletonJob;
import com.sun.jmx.namespace.JMXNamespaceUtils;
import com.sun.jmx.mbeanserver.PerThreadGroupPool;
import com.sun.jmx.remote.util.ClassLogger;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.InstanceNotFoundException;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServerConnection;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.NotificationResult;
import javax.management.remote.TargetedNotification;
/**
* <p>This class is used to manage its notification listeners on the client
* side in the same way as on the MBean server side. This class needs to work
* with an {@link EventClientDelegateMBean} on the server side.</p>
*
* <P>A user can specify an {@link EventRelay} object to specify how to receive
* notifications forwarded by the {@link EventClientDelegateMBean}. By default,
* the class {@link FetchingEventRelay} is used.</p>
*
* <p>A user can specify an {@link java.util.concurrent.Executor Executor}
* to distribute notifications to local listeners. If no executor is
* specified, the thread in the {@link EventRelay} which calls {@link
* EventReceiver#receive EventReceiver.receive} will be reused to distribute
* the notifications (in other words, to call the {@link
* NotificationListener#handleNotification handleNotification} method of the
* appropriate listeners). It is useful to make a separate thread do this
* distribution in some cases. For example, if network communication is slow,
* the forwarding thread can concentrate on communication while, locally,
* the distributing thread distributes the received notifications. Another
* usage is to share a thread pool between many clients, for scalability.
* Note, though, that if the {@code Executor} can create more than one thread
* then it is possible that listeners will see notifications in a different
* order from the order in which they were sent.</p>
*
* <p>An object of this class sends notifications to listeners added with
* {@link #addEventClientListener}. The {@linkplain Notification#getType()
* type} of each such notification is one of {@link #FAILED}, {@link #NONFATAL},
* or {@link #NOTIFS_LOST}.</p>
*
* @since JMX 2.0
*/
public class EventClient implements EventConsumer, NotificationManager {
/**
* <p>A notification string type used by an {@code EventClient} object
* to inform a listener added by {@link #addEventClientListener} that
* it failed to get notifications from a remote server, and that it is
* possible that no more notifications will be delivered.</p>
*
* @see #addEventClientListener
* @see EventReceiver#failed
*/
public static final String FAILED = "jmx.event.service.failed";
/**
* <p>Reports that an unexpected exception has been received by the {@link
* EventRelay} object but that it is non-fatal. For example, a notification
* received is not serializable or its class is not found.</p>
*
* @see #addEventClientListener
* @see EventReceiver#nonFatal
*/
public static final String NONFATAL = "jmx.event.service.nonfatal";
/**
* <p>A notification string type used by an {@code EventClient} object to
* inform a listener added by {@code #addEventClientListener} that it
* has detected that notifications have been lost. The {@link
* Notification#getUserData() userData} of the notification is a Long which
* is an upper bound on the number of lost notifications that have just
* been detected.</p>
*
* @see #addEventClientListener
*/
public static final String NOTIFS_LOST = "jmx.event.service.notifs.lost";
/**
* The default lease time, {@value}, in milliseconds.
*
* @see EventClientDelegateMBean#lease
*/
public static final long DEFAULT_LEASE_TIMEOUT = 300000;
/**
* <p>Constructs a default {@code EventClient} object.</p>
*
* <p>This object creates a {@link FetchingEventRelay} object to
* receive notifications forwarded by the {@link EventClientDelegateMBean}.
* The {@link EventClientDelegateMBean} that it works with is the
* one registered with the {@linkplain EventClientDelegate#OBJECT_NAME
* default ObjectName}. The thread from the {@link FetchingEventRelay}
* object that fetches the notifications is also used to distribute them.
*
* @param conn An {@link MBeanServerConnection} object used to communicate
* with an {@link EventClientDelegateMBean} MBean.
*
* @throws IllegalArgumentException If {@code conn} is null.
* @throws IOException If an I/O error occurs when communicating with the
* {@code EventClientDelegateMBean}.
*/
public EventClient(MBeanServerConnection conn) throws IOException {
this(EventClientDelegate.getProxy(conn));
}
/**
* Constructs an {@code EventClient} object with a specified
* {@link EventClientDelegateMBean}.
*
* <p>This object creates a {@link FetchingEventRelay} object to receive
* notifications forwarded by the {@link EventClientDelegateMBean}. The
* thread from the {@link FetchingEventRelay} object that fetches the
* notifications is also used to distribute them.
*
* @param delegate An {@link EventClientDelegateMBean} object to work with.
*
* @throws IllegalArgumentException If {@code delegate} is null.
* @throws IOException If an I/O error occurs when communicating with the
* the {@link EventClientDelegateMBean}.
*/
public EventClient(EventClientDelegateMBean delegate)
throws IOException {
this(delegate, null, null, null, DEFAULT_LEASE_TIMEOUT);
}
/**
* Constructs an {@code EventClient} object with the specified
* {@link EventClientDelegateMBean}, {@link EventRelay}
* object, and distributing thread.
*
* @param delegate An {@link EventClientDelegateMBean} object to work with.
* Usually, this will be a proxy constructed using
* {@link EventClientDelegate#getProxy}.
* @param eventRelay An object used to receive notifications
* forwarded by the {@link EventClientDelegateMBean}. If {@code null}, a
* {@link FetchingEventRelay} object will be used.
* @param distributingExecutor Used to distribute notifications to local
* listeners. If {@code null}, the thread that calls {@link
* EventReceiver#receive EventReceiver.receive} from the {@link EventRelay}
* object is used.
* @param leaseScheduler An object that will be used to schedule the
* periodic {@linkplain EventClientDelegateMBean#lease lease updates}.
* If {@code null}, a default scheduler will be used.
* @param requestedLeaseTime The lease time used to keep this client alive
* in the {@link EventClientDelegateMBean}. A value of zero is equivalent
* to the {@linkplain #DEFAULT_LEASE_TIMEOUT default value}.
*
* @throws IllegalArgumentException If {@code delegate} is null.
* @throws IOException If an I/O error occurs when communicating with the
* {@link EventClientDelegateMBean}.
*/
public EventClient(EventClientDelegateMBean delegate,
EventRelay eventRelay,
Executor distributingExecutor,
ScheduledExecutorService leaseScheduler,
long requestedLeaseTime)
throws IOException {
if (delegate == null) {
throw new IllegalArgumentException("Null EventClientDelegateMBean");
}
if (requestedLeaseTime == 0)
requestedLeaseTime = DEFAULT_LEASE_TIMEOUT;
else if (requestedLeaseTime < 0) {
throw new IllegalArgumentException(
"Negative lease time: " + requestedLeaseTime);
}
eventClientDelegate = delegate;
if (eventRelay != null) {
this.eventRelay = eventRelay;
} else {
try {
this.eventRelay = new FetchingEventRelay(delegate);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
// impossible?
final IOException ioee = new IOException(e.toString());
ioee.initCause(e);
throw ioee;
}
}
if (distributingExecutor == null)
distributingExecutor = callerExecutor;
this.distributingExecutor = distributingExecutor;
this.dispatchingJob = new DispatchingJob();
clientId = this.eventRelay.getClientId();
this.requestedLeaseTime = requestedLeaseTime;
if (leaseScheduler == null)
leaseScheduler = defaultLeaseScheduler();
leaseRenewer = new LeaseRenewer(leaseScheduler, renewLease);
if (logger.traceOn()) {
logger.trace("init", "New EventClient: "+clientId);
}
}
private static ScheduledExecutorService defaultLeaseScheduler() {
// The default lease scheduler uses a ScheduledThreadPoolExecutor
// with a maximum of 20 threads. This means that if you have many
// EventClient instances and some of them get blocked (because of an
// unresponsive network, for example), then even the instances that
// are connected to responsive servers may have their leases expire.
// XXX check if the above is true and possibly fix.
PerThreadGroupPool.Create<ScheduledThreadPoolExecutor> create =
new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
"JMX EventClient lease renewer %d");
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
20, daemonThreadFactory);
exec.setKeepAliveTime(1, TimeUnit.SECONDS);
exec.allowCoreThreadTimeOut(true);
exec.setRemoveOnCancelPolicy(true);
return exec;
}
};
return leaseRenewerThreadPool.getThreadPoolExecutor(create);
}
/**
* <p>Closes this EventClient, removes all listeners and stops receiving
* notifications.</p>
*
* <p>This method calls {@link
* EventClientDelegateMBean#removeClient(String)} and {@link
* EventRelay#stop}. Both operations occur even if one of them
* throws an {@code IOException}.
*
* @throws IOException if an I/O error occurs when communicating with
* {@link EventClientDelegateMBean}, or if {@link EventRelay#stop}
* throws an {@code IOException}.
*/
public void close() throws IOException {
if (logger.traceOn()) {
logger.trace("close", clientId);
}
synchronized(listenerInfoMap) {
if (closed) {
return;
}
closed = true;
listenerInfoMap.clear();
}
if (leaseRenewer != null)
leaseRenewer.close();
IOException ioe = null;
try {
eventRelay.stop();
} catch (IOException e) {
ioe = e;
logger.debug("close", "EventRelay.stop", e);
}
try {
eventClientDelegate.removeClient(clientId);
} catch (Exception e) {
if (e instanceof IOException)
ioe = (IOException) e;
else
ioe = new IOException(e);
logger.debug("close",
"Got exception when removing "+clientId, e);
}
if (ioe != null)
throw ioe;
}
/**
* <p>Determine if this {@code EventClient} is closed.</p>
*
* @return True if the {@code EventClient} is closed.
*/
public boolean closed() {
return closed;
}
/**
* <p>Return the {@link EventRelay} associated with this
* {@code EventClient}.</p>
*
* @return The {@link EventRelay} object used.
*/
public EventRelay getEventRelay() {
return eventRelay;
}
/**
* <p>Return the lease time that this {@code EventClient} requests
* on every lease renewal.</p>
*
* @return The requested lease time.
*
* @see EventClientDelegateMBean#lease
*/
public long getRequestedLeaseTime() {
return requestedLeaseTime;
}
/**
* @see javax.management.MBeanServerConnection#addNotificationListener(
* ObjectName, NotificationListener, NotificationFilter, Object).
*/
public void addNotificationListener(ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback)
throws InstanceNotFoundException, IOException {
if (logger.traceOn()) {
logger.trace("addNotificationListener", "");
}
checkState();
Integer listenerId;
try {
listenerId =
eventClientDelegate.addListener(clientId, name, filter);
} catch (EventClientNotFoundException ecnfe) {
final IOException ioe = new IOException();
ioe.initCause(ecnfe);
throw ioe;
}
synchronized(listenerInfoMap) {
listenerInfoMap.put(listenerId, new ListenerInfo(
name,
listener,
filter,
handback,
false));
}
startListening();
}
/**
* @see javax.management.MBeanServerConnection#removeNotificationListener(
* ObjectName, NotificationListener).
*/
public void removeNotificationListener(ObjectName name,
NotificationListener listener)
throws InstanceNotFoundException,
ListenerNotFoundException,
IOException {
if (logger.traceOn()) {
logger.trace("removeNotificationListener", "");
}
checkState();
for (Integer id : getListenerInfo(name, listener, false)) {
removeListener(id);
}
}
/**
* @see javax.management.MBeanServerConnection#removeNotificationListener(
* ObjectName, NotificationListener, NotificationFilter, Object).
*/
public void removeNotificationListener(ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback)
throws InstanceNotFoundException,
ListenerNotFoundException,
IOException {
if (logger.traceOn()) {
logger.trace("removeNotificationListener", "with all arguments.");
}
checkState();
final Integer listenerId =
getListenerInfo(name, listener, filter, handback, false);
removeListener(listenerId);
}
/**
* @see javax.management.event.EventConsumer#unsubscribe(
* ObjectName, NotificationListener).
*/
public void unsubscribe(ObjectName name,
NotificationListener listener)
throws ListenerNotFoundException, IOException {
if (logger.traceOn()) {
logger.trace("unsubscribe", "");
}
checkState();
final Integer listenerId =
getMatchedListenerInfo(name, listener, true);
synchronized(listenerInfoMap) {
if (listenerInfoMap.remove(listenerId) == null) {
throw new ListenerNotFoundException();
}
}
stopListening();
try {
eventClientDelegate.removeListenerOrSubscriber(clientId, listenerId);
} catch (InstanceNotFoundException e) {
logger.trace("unsubscribe", "removeSubscriber", e);
} catch (EventClientNotFoundException cnfe) {
logger.trace("unsubscribe", "removeSubscriber", cnfe);
}
}
/**
* @see javax.management.event.EventConsumer#subscribe(
* ObjectName, NotificationListener, NotificationFilter, Object).
*/
public void subscribe(ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback) throws IOException {
if (logger.traceOn()) {
logger.trace("subscribe", "");
}
checkState();
Integer listenerId;
try {
listenerId =
eventClientDelegate.addSubscriber(clientId, name, filter);
} catch (EventClientNotFoundException ecnfe) {
final IOException ioe = new IOException();
ioe.initCause(ecnfe);
throw ioe;
}
synchronized(listenerInfoMap) {
listenerInfoMap.put(listenerId, new ListenerInfo(
name,
listener,
filter,
handback,
true));
}
startListening();
}
/**
* <p>Adds a set of listeners to the remote MBeanServer. This method can
* be used to copy the listeners from one {@code EventClient} to another.</p>
*
* <p>A listener is represented by a {@link ListenerInfo} object. The listener
* is added by calling {@link #subscribe(ObjectName,
* NotificationListener, NotificationFilter, Object)} if the method
* {@link ListenerInfo#isSubscription() isSubscription}
* returns {@code true}; otherwise it is added by calling
* {@link #addNotificationListener(ObjectName, NotificationListener,
* NotificationFilter, Object)}.</p>
*
* <P>The method returns the listeners which were added successfully. The
* elements in the returned collection are a subset of the elements in
* {@code infoList}. If all listeners were added successfully, the two
* collections are the same. If no listener was added successfully, the
* returned collection is empty.</p>
*
* @param listeners the listeners to add.
*
* @return The listeners that were added successfully.
*
* @throws IOException If an I/O error occurs.
*
* @see #getListeners()
*/
public Collection<ListenerInfo> addListeners(Collection<ListenerInfo> listeners)
throws IOException {
if (logger.traceOn()) {
logger.trace("addListeners", "");
}
checkState();
if (listeners == null || listeners.isEmpty())
return Collections.emptySet();
final List<ListenerInfo> list = new ArrayList<ListenerInfo>();
for (ListenerInfo l : listeners) {
try {
if (l.isSubscription()) {
subscribe(l.getObjectName(),
l.getListener(),
l.getFilter(),
l.getHandback());
} else {
addNotificationListener(l.getObjectName(),
l.getListener(),
l.getFilter(),
l.getHandback());
}
list.add(l);
} catch (Exception e) {
if (logger.traceOn()) {
logger.trace("addListeners", "failed to add: "+l, e);
}
}
}
return list;
}
/**
* Returns the set of listeners that have been added through
* this {@code EventClient} and not subsequently removed.
*
* @return A collection of listener information. Empty if there are no
* current listeners or if this {@code EventClient} has been {@linkplain
* #close closed}.
*
* @see #addListeners
*/
public Collection<ListenerInfo> getListeners() {
if (logger.traceOn()) {
logger.trace("getListeners", "");
}
synchronized(listenerInfoMap) {
return Collections.unmodifiableCollection(listenerInfoMap.values());
}
}
/**
* Adds a listener to receive the {@code EventClient} notifications specified in
* {@link #getEventClientNotificationInfo}.
*
* @param listener A listener to receive {@code EventClient} notifications.
* @param filter A filter to select which notifications are to be delivered
* to the listener, or {@code null} if all notifications are to be delivered.
* @param handback An object to be given to the listener along with each
* notification. Can be null.
* @throws NullPointerException If listener is null.
* @see #removeEventClientListener
*/
public void addEventClientListener(NotificationListener listener,
NotificationFilter filter,
Object handback) {
if (logger.traceOn()) {
logger.trace("addEventClientListener", "");
}
broadcaster.addNotificationListener(listener, filter, handback);
}
/**
* Removes a listener added to receive {@code EventClient} notifications specified in
* {@link #getEventClientNotificationInfo}.
*
* @param listener A listener to receive {@code EventClient} notifications.
* @throws NullPointerException If listener is null.
* @throws ListenerNotFoundException If the listener is not added to
* this {@code EventClient}.
*/
public void removeEventClientListener(NotificationListener listener)
throws ListenerNotFoundException {
if (logger.traceOn()) {
logger.trace("removeEventClientListener", "");
}
broadcaster.removeNotificationListener(listener);
}
/**
* <p>Get the types of notification that an {@code EventClient} can send
* to listeners added with {@link #addEventClientListener
* addEventClientListener}.</p>
*
* @return Types of notification emitted by this {@code EventClient}.
*
* @see #FAILED
* @see #NONFATAL
* @see #NOTIFS_LOST
*/
public MBeanNotificationInfo[] getEventClientNotificationInfo() {
return myInfo.clone();
}
private static boolean match(ListenerInfo li,
ObjectName name,
NotificationListener listener,
boolean subscribed) {
return li.getObjectName().equals(name) &&
li.getListener() == listener &&
li.isSubscription() == subscribed;
}
private static boolean match(ListenerInfo li,
ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback,
boolean subscribed) {
return li.getObjectName().equals(name) &&
li.getFilter() == filter &&
li.getListener() == listener &&
li.getHandback() == handback &&
li.isSubscription() == subscribed;
}
// ---------------------------------------------------
// private classes
// ---------------------------------------------------
private class DispatchingJob extends RepeatedSingletonJob {
public DispatchingJob() {
super(distributingExecutor);
}
public boolean isSuspended() {
return closed || buffer.size() == 0;
}
public void task() {
TargetedNotification[] tns ;
int lost = 0;
synchronized(buffer) {
tns = buffer.removeNotifs();
lost = buffer.removeLost();
}
if ((tns == null || tns.length == 0)
&& lost == 0) {
return;
}
// forwarding
if (tns != null && tns.length > 0) {
if (logger.traceOn()) {
logger.trace("DispatchingJob-task",
"Forwarding: "+tns.length);
}
for (TargetedNotification tn : tns) {
final ListenerInfo li = listenerInfoMap.get(tn.getListenerID());
try {
li.getListener().handleNotification(tn.getNotification(),
li.getHandback());
} catch (Exception e) {
logger.fine(
"DispatchingJob.task", "listener got exception", e);
}
}
}
if (lost > 0) {
if (logger.traceOn()) {
logger.trace("DispatchingJob-task",
"lost: "+lost);
}
final Notification n = new Notification(NOTIFS_LOST,
EventClient.this,
myNotifCounter.getAndIncrement(),
System.currentTimeMillis(),
"Lost notifications.");
n.setUserData(new Long(lost));
broadcaster.sendNotification(n);
}
}
}
private class EventReceiverImpl implements EventReceiver {
public void receive(NotificationResult nr) {
if (logger.traceOn()) {
logger.trace("MyEventReceiver-receive", "");
}
synchronized(buffer) {
buffer.addNotifs(nr);
dispatchingJob.resume();
}
}
public void failed(Throwable t) {
if (logger.traceOn()) {
logger.trace("MyEventReceiver-failed", "", t);
}
final Notification n = new Notification(FAILED,
this,
myNotifCounter.getAndIncrement(),
System.currentTimeMillis());
n.setSource(t);
broadcaster.sendNotification(n);
}
public void nonFatal(Exception e) {
if (logger.traceOn()) {
logger.trace("MyEventReceiver-nonFatal", "", e);
}
final Notification n = new Notification(NONFATAL,
this,
myNotifCounter.getAndIncrement(),
System.currentTimeMillis());
n.setSource(e);
broadcaster.sendNotification(n);
}
}
// ----------------------------------------------------
// private class
// ----------------------------------------------------
// ----------------------------------------------------
// private methods
// ----------------------------------------------------
private Integer getListenerInfo(ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback,
boolean subscribed) throws ListenerNotFoundException {
synchronized(listenerInfoMap) {
for (Map.Entry<Integer, ListenerInfo> entry :
listenerInfoMap.entrySet()) {
ListenerInfo li = entry.getValue();
if (match(li, name, listener, filter, handback, subscribed)) {
return entry.getKey();
}
}
}
throw new ListenerNotFoundException();
}
private Integer getMatchedListenerInfo(ObjectName name,
NotificationListener listener,
boolean subscribed) throws ListenerNotFoundException {
synchronized(listenerInfoMap) {
for (Map.Entry<Integer, ListenerInfo> entry :
listenerInfoMap.entrySet()) {
ListenerInfo li = entry.getValue();
if (li.getObjectName().equals(name) &&
li.getListener() == listener &&
li.isSubscription() == subscribed) {
return entry.getKey();
}
}
}
throw new ListenerNotFoundException();
}
private Collection<Integer> getListenerInfo(ObjectName name,
NotificationListener listener,
boolean subscribed) throws ListenerNotFoundException {
final ArrayList<Integer> ids = new ArrayList<Integer>();
synchronized(listenerInfoMap) {
for (Map.Entry<Integer, ListenerInfo> entry :
listenerInfoMap.entrySet()) {
ListenerInfo li = entry.getValue();
if (match(li, name, listener, subscribed)) {
ids.add(entry.getKey());
}
}
}
if (ids.isEmpty()) {
throw new ListenerNotFoundException();
}
return ids;
}
private void checkState() throws IOException {
synchronized(listenerInfoMap) {
if (closed) {
throw new IOException("Ended!");
}
}
}
private void startListening() throws IOException {
synchronized(listenerInfoMap) {
if (!startedListening && listenerInfoMap.size() > 0) {
eventRelay.setEventReceiver(myReceiver);
}
startedListening = true;
if (logger.traceOn()) {
logger.trace("startListening", "listening");
}
}
}
private void stopListening() throws IOException {
synchronized(listenerInfoMap) {
if (listenerInfoMap.size() == 0 && startedListening) {
eventRelay.setEventReceiver(null);
startedListening = false;
if (logger.traceOn()) {
logger.trace("stopListening", "non listening");
}
}
}
}
private void removeListener(Integer id)
throws InstanceNotFoundException,
ListenerNotFoundException,
IOException {
synchronized(listenerInfoMap) {
if (listenerInfoMap.remove(id) == null) {
throw new ListenerNotFoundException();
}
stopListening();
}
try {
eventClientDelegate.removeListenerOrSubscriber(clientId, id);
} catch (EventClientNotFoundException cnfe) {
logger.trace("removeListener", "ecd.removeListener", cnfe);
}
}
// ----------------------------------------------------
// private variables
// ----------------------------------------------------
private static final ClassLogger logger =
new ClassLogger("javax.management.event", "EventClient");
private final Executor distributingExecutor;
private final EventClientDelegateMBean eventClientDelegate;
private final EventRelay eventRelay;
private volatile String clientId = null;
private final long requestedLeaseTime;
private final ReceiverBuffer buffer = new ReceiverBuffer();
private final EventReceiverImpl myReceiver =
new EventReceiverImpl();
private final DispatchingJob dispatchingJob;
private final HashMap<Integer, ListenerInfo> listenerInfoMap =
new HashMap<Integer, ListenerInfo>();
private volatile boolean closed = false;
private volatile boolean startedListening = false;
// Could change synchronization here. But at worst a race will mean
// sequence numbers are not contiguous, which may not matter much.
private final AtomicLong myNotifCounter = new AtomicLong();
private final static MBeanNotificationInfo[] myInfo =
new MBeanNotificationInfo[] {
new MBeanNotificationInfo(
new String[] {FAILED, NOTIFS_LOST},
Notification.class.getName(), "")};
private final NotificationBroadcasterSupport broadcaster =
new NotificationBroadcasterSupport();
private final static Executor callerExecutor = new Executor() {
// DirectExecutor using caller thread
public void execute(Runnable r) {
r.run();
}
};
private static void checkInit(final MBeanServerConnection conn,
final ObjectName delegateName)
throws IOException {
if (conn == null) {
throw new IllegalArgumentException("No connection specified");
}
if (delegateName != null &&
(!conn.isRegistered(delegateName))) {
throw new IllegalArgumentException(
delegateName +
": not found");
}
if (delegateName == null &&
(!conn.isRegistered(
EventClientDelegate.OBJECT_NAME))) {
throw new IllegalArgumentException(
EventClientDelegate.OBJECT_NAME +
": not found");
}
}
// ----------------------------------------------------
// private event lease issues
// ----------------------------------------------------
private Callable<Long> renewLease = new Callable<Long>() {
public Long call() throws IOException, EventClientNotFoundException {
return eventClientDelegate.lease(clientId, requestedLeaseTime);
}
};
private final LeaseRenewer leaseRenewer;
// ------------------------------------------------------------------------
/**
* Constructs an {@code MBeanServerConnection} that uses an {@code EventClient} object,
* if the underlying connection has an {@link EventClientDelegateMBean}.
* <P> The {@code EventClient} object creates a default
* {@link FetchingEventRelay} object to
* receive notifications forwarded by the {@link EventClientDelegateMBean}.
* The {@link EventClientDelegateMBean} it works with is the
* default one registered with the ObjectName
* {@link EventClientDelegate#OBJECT_NAME
* OBJECT_NAME}.
* The thread from the {@link FetchingEventRelay} object that fetches the
* notifications is also used to distribute them.
*
* @param conn An {@link MBeanServerConnection} object used to communicate
* with an {@link EventClientDelegateMBean}.
* @throws IllegalArgumentException If the value of {@code conn} is null,
* or the default {@link EventClientDelegateMBean} is not registered.
* @throws IOException If an I/O error occurs.
*/
public static MBeanServerConnection getEventClientConnection(
final MBeanServerConnection conn)
throws IOException {
return getEventClientConnection(conn, null);
}
/**
* Constructs an MBeanServerConnection that uses an {@code EventClient}
* object with a user-specific {@link EventRelay}
* object.
* <P>
* The {@link EventClientDelegateMBean} which it works with is the
* default one registered with the ObjectName
* {@link EventClientDelegate#OBJECT_NAME
* OBJECT_NAME}
* The thread that calls {@link EventReceiver#receive
* EventReceiver.receive} from the {@link EventRelay} object is used
* to distribute notifications to their listeners.
*
* @param conn An {@link MBeanServerConnection} object used to communicate
* with an {@link EventClientDelegateMBean}.
* @param eventRelay A user-specific object used to receive notifications
* forwarded by the {@link EventClientDelegateMBean}. If null, the default
* {@link FetchingEventRelay} object is used.
* @throws IllegalArgumentException If the value of {@code conn} is null,
* or the default {@link EventClientDelegateMBean} is not registered.
* @throws IOException If an I/O error occurs.
*/
public static MBeanServerConnection getEventClientConnection(
final MBeanServerConnection conn,
final EventRelay eventRelay)
throws IOException {
if (newEventConn == null) {
throw new IllegalArgumentException(
"Class not found: EventClientConnection");
}
checkInit(conn,null);
final Callable<EventClient> factory = new Callable<EventClient>() {
final public EventClient call() throws Exception {
EventClientDelegateMBean ecd = EventClientDelegate.getProxy(conn);
return new EventClient(ecd, eventRelay, null, null,
DEFAULT_LEASE_TIMEOUT);
}
};
try {
return (MBeanServerConnection)newEventConn.invoke(null,
conn, factory);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
private static Method newEventConn = null;
static {
try {
Class<?> c = Class.forName(
"com.sun.jmx.remote.util.EventClientConnection",
false, Thread.currentThread().getContextClassLoader());
newEventConn = c.getMethod("getEventConnectionFor",
MBeanServerConnection.class, Callable.class);
} catch (Exception e) {
// OK: we're running in a subset of our classes
}
}
/**
* <p>Get the client id of this {@code EventClient} in the
* {@link EventClientDelegateMBean}.
*
* @return the client id.
*
* @see EventClientDelegateMBean#addClient(String, Object[], String[])
* EventClientDelegateMBean.addClient
*/
public String getClientId() {
return clientId;
}
/**
* Returns a JMX Connector that will use an {@link EventClient}
* to subscribe for notifications. If the server doesn't have
* an {@link EventClientDelegateMBean}, then the connector will
* use the legacy notification mechanism instead.
*
* @param wrapped The underlying JMX Connector wrapped by the returned
* connector.
*
* @return A JMX Connector that will uses an {@link EventClient}, if
* available.
*
* @see EventClient#getEventClientConnection(MBeanServerConnection)
*/
public static JMXConnector withEventClient(final JMXConnector wrapped) {
return JMXNamespaceUtils.withEventClient(wrapped);
}
private static final PerThreadGroupPool<ScheduledThreadPoolExecutor>
leaseRenewerThreadPool = PerThreadGroupPool.make();
}