jdk/src/share/classes/javax/management/event/EventClient.java
changeset 4156 acaa49a2768a
parent 4155 460e37d40f12
child 4159 9e3aae7675f1
equal deleted inserted replaced
4155:460e37d40f12 4156:acaa49a2768a
     1 /*
       
     2  * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package javax.management.event;
       
    27 
       
    28 import com.sun.jmx.event.DaemonThreadFactory;
       
    29 import com.sun.jmx.event.LeaseRenewer;
       
    30 import com.sun.jmx.event.ReceiverBuffer;
       
    31 import com.sun.jmx.event.RepeatedSingletonJob;
       
    32 import com.sun.jmx.mbeanserver.PerThreadGroupPool;
       
    33 import com.sun.jmx.remote.util.ClassLogger;
       
    34 
       
    35 import java.io.IOException;
       
    36 import java.lang.reflect.Method;
       
    37 import java.util.ArrayList;
       
    38 import java.util.Collection;
       
    39 import java.util.Collections;
       
    40 import java.util.HashMap;
       
    41 import java.util.List;
       
    42 import java.util.Map;
       
    43 import java.util.concurrent.Callable;
       
    44 import java.util.concurrent.Executor;
       
    45 
       
    46 import java.util.concurrent.ScheduledExecutorService;
       
    47 import java.util.concurrent.ScheduledThreadPoolExecutor;
       
    48 import java.util.concurrent.ThreadFactory;
       
    49 import java.util.concurrent.TimeUnit;
       
    50 import java.util.concurrent.atomic.AtomicLong;
       
    51 import javax.management.InstanceNotFoundException;
       
    52 import javax.management.ListenerNotFoundException;
       
    53 import javax.management.MBeanNotificationInfo;
       
    54 import javax.management.MBeanServerConnection;
       
    55 import javax.management.Notification;
       
    56 import javax.management.NotificationBroadcasterSupport;
       
    57 import javax.management.NotificationFilter;
       
    58 import javax.management.NotificationListener;
       
    59 import javax.management.ObjectName;
       
    60 import javax.management.remote.NotificationResult;
       
    61 import javax.management.remote.TargetedNotification;
       
    62 
       
    63 /**
       
    64  * <p>This class is used to manage its notification listeners on the client
       
    65  * side in the same way as on the MBean server side. This class needs to work
       
    66  * with an {@link EventClientDelegateMBean} on the server side.</p>
       
    67  *
       
    68  * <P>A user can specify an {@link EventRelay} object to specify how to receive
       
    69  * notifications forwarded by the {@link EventClientDelegateMBean}. By default,
       
    70  * the class {@link FetchingEventRelay} is used.</p>
       
    71  *
       
    72  * <p>A user can specify an {@link java.util.concurrent.Executor Executor}
       
    73  * to distribute notifications to local listeners. If no executor is
       
    74  * specified, the thread in the {@link EventRelay} which calls {@link
       
    75  * EventReceiver#receive EventReceiver.receive} will be reused to distribute
       
    76  * the notifications (in other words, to call the {@link
       
    77  * NotificationListener#handleNotification handleNotification} method of the
       
    78  * appropriate listeners). It is useful to make a separate thread do this
       
    79  * distribution in some cases. For example, if network communication is slow,
       
    80  * the forwarding thread can concentrate on communication while, locally,
       
    81  * the distributing thread distributes the received notifications. Another
       
    82  * usage is to share a thread pool between many clients, for scalability.
       
    83  * Note, though, that if the {@code Executor} can create more than one thread
       
    84  * then it is possible that listeners will see notifications in a different
       
    85  * order from the order in which they were sent.</p>
       
    86  *
       
    87  * <p>An object of this class sends notifications to listeners added with
       
    88  * {@link #addEventClientListener}.  The {@linkplain Notification#getType()
       
    89  * type} of each such notification is one of {@link #FAILED}, {@link #NONFATAL},
       
    90  * or {@link #NOTIFS_LOST}.</p>
       
    91  *
       
    92  * @since JMX 2.0
       
    93  */
       
    94 public class EventClient implements EventConsumer, NotificationManager {
       
    95 
       
    96     /**
       
    97      * <p>A notification string type used by an {@code EventClient} object
       
    98      * to inform a listener added by {@link #addEventClientListener} that
       
    99      * it failed to get notifications from a remote server, and that it is
       
   100      * possible that no more notifications will be delivered.</p>
       
   101      *
       
   102      * @see #addEventClientListener
       
   103      * @see EventReceiver#failed
       
   104      */
       
   105     public static final String FAILED = "jmx.event.service.failed";
       
   106 
       
   107     /**
       
   108      * <p>Reports that an unexpected exception has been received by the {@link
       
   109      * EventRelay} object but that it is non-fatal. For example, a notification
       
   110      * received is not serializable or its class is not found.</p>
       
   111      *
       
   112      * @see #addEventClientListener
       
   113      * @see EventReceiver#nonFatal
       
   114      */
       
   115     public static final String NONFATAL = "jmx.event.service.nonfatal";
       
   116 
       
   117     /**
       
   118      * <p>A notification string type used by an {@code EventClient} object
       
   119      * to inform a listener added by {@link #addEventClientListener
       
   120      * addEventClientListener} that it has detected that notifications have
       
   121      * been lost.  The {@link Notification#getUserData() userData} of the
       
   122      * notification is a Long which is an upper bound on the number of lost
       
   123      * notifications that have just been detected.</p>
       
   124      *
       
   125      * @see #addEventClientListener
       
   126      */
       
   127     public static final String NOTIFS_LOST = "jmx.event.service.notifs.lost";
       
   128 
       
   129     /**
       
   130      * The default lease time that EventClient instances will request, in
       
   131      * milliseconds.  This value is {@value}.
       
   132      *
       
   133      * @see EventClientDelegateMBean#lease
       
   134      */
       
   135     public static final long DEFAULT_REQUESTED_LEASE_TIME = 300000;
       
   136 
       
   137     /**
       
   138      * <p>Constructs a default {@code EventClient} object.</p>
       
   139      *
       
   140      * <p>This object creates a {@link FetchingEventRelay} object to
       
   141      * receive notifications forwarded by the {@link EventClientDelegateMBean}.
       
   142      * The {@link EventClientDelegateMBean} that it works with is the
       
   143      * one registered with the {@linkplain EventClientDelegate#OBJECT_NAME
       
   144      * default ObjectName}.  The thread from the {@link FetchingEventRelay}
       
   145      * object that fetches the notifications is also used to distribute them.
       
   146      *
       
   147      * @param conn An {@link MBeanServerConnection} object used to communicate
       
   148      * with an {@link EventClientDelegateMBean} MBean.
       
   149      *
       
   150      * @throws IllegalArgumentException If {@code conn} is null.
       
   151      * @throws IOException If an I/O error occurs when communicating with the
       
   152      * {@code EventClientDelegateMBean}.
       
   153      */
       
   154     public EventClient(MBeanServerConnection conn) throws IOException {
       
   155         this(EventClientDelegate.getProxy(conn));
       
   156     }
       
   157 
       
   158     /**
       
   159      * Constructs an {@code EventClient} object with a specified
       
   160      * {@link EventClientDelegateMBean}.
       
   161      *
       
   162      * <p>This object creates a {@link FetchingEventRelay} object to receive
       
   163      * notifications forwarded by the {@link EventClientDelegateMBean}.  The
       
   164      * thread from the {@link FetchingEventRelay} object that fetches the
       
   165      * notifications is also used to distribute them.
       
   166      *
       
   167      * @param delegate An {@link EventClientDelegateMBean} object to work with.
       
   168      *
       
   169      * @throws IllegalArgumentException If {@code delegate} is null.
       
   170      * @throws IOException If an I/O error occurs when communicating with the
       
   171      * the {@link EventClientDelegateMBean}.
       
   172      */
       
   173     public EventClient(EventClientDelegateMBean delegate)
       
   174     throws IOException {
       
   175         this(delegate, null, null, null, DEFAULT_REQUESTED_LEASE_TIME);
       
   176     }
       
   177 
       
   178     /**
       
   179      * Constructs an {@code EventClient} object with the specified
       
   180      * {@link EventClientDelegateMBean}, {@link EventRelay}
       
   181      * object, and distributing thread.
       
   182      *
       
   183      * @param delegate An {@link EventClientDelegateMBean} object to work with.
       
   184      * Usually, this will be a proxy constructed using
       
   185      * {@link EventClientDelegate#getProxy}.
       
   186      * @param eventRelay An object used to receive notifications
       
   187      * forwarded by the {@link EventClientDelegateMBean}. If {@code null}, a
       
   188      * {@link FetchingEventRelay} object will be used.
       
   189      * @param distributingExecutor Used to distribute notifications to local
       
   190      * listeners. Only one job at a time will be submitted to this Executor.
       
   191      * If {@code distributingExecutor} is {@code null}, the thread that calls
       
   192      * {@link EventReceiver#receive EventReceiver.receive} from the {@link
       
   193      * EventRelay} object is used.
       
   194      * @param leaseScheduler An object that will be used to schedule the
       
   195      * periodic {@linkplain EventClientDelegateMBean#lease lease updates}.
       
   196      * If {@code null}, a default scheduler will be used.
       
   197      * @param requestedLeaseTime The lease time used to keep this client alive
       
   198      * in the {@link EventClientDelegateMBean}.  A value of zero is equivalent
       
   199      * to the {@linkplain #DEFAULT_REQUESTED_LEASE_TIME default value}.
       
   200      *
       
   201      * @throws IllegalArgumentException If {@code delegate} is null.
       
   202      * @throws IOException If an I/O error occurs when communicating with the
       
   203      * {@link EventClientDelegateMBean}.
       
   204      */
       
   205     public EventClient(EventClientDelegateMBean delegate,
       
   206             EventRelay eventRelay,
       
   207             Executor distributingExecutor,
       
   208             ScheduledExecutorService leaseScheduler,
       
   209             long requestedLeaseTime)
       
   210             throws IOException {
       
   211         if (delegate == null) {
       
   212             throw new IllegalArgumentException("Null EventClientDelegateMBean");
       
   213         }
       
   214 
       
   215         if (requestedLeaseTime == 0)
       
   216             requestedLeaseTime = DEFAULT_REQUESTED_LEASE_TIME;
       
   217         else if (requestedLeaseTime < 0) {
       
   218             throw new IllegalArgumentException(
       
   219                     "Negative lease time: " + requestedLeaseTime);
       
   220         }
       
   221 
       
   222         eventClientDelegate = delegate;
       
   223 
       
   224         if (eventRelay != null) {
       
   225             this.eventRelay = eventRelay;
       
   226         } else {
       
   227             try {
       
   228                 this.eventRelay = new FetchingEventRelay(delegate);
       
   229             } catch (IOException ioe) {
       
   230                 throw ioe;
       
   231             } catch (Exception e) {
       
   232                 // impossible?
       
   233                 final IOException ioee = new IOException(e.toString());
       
   234                 ioee.initCause(e);
       
   235                 throw ioee;
       
   236             }
       
   237         }
       
   238 
       
   239         if (distributingExecutor == null)
       
   240             distributingExecutor = callerExecutor;
       
   241         this.distributingExecutor = distributingExecutor;
       
   242         this.dispatchingJob = new DispatchingJob();
       
   243 
       
   244         clientId = this.eventRelay.getClientId();
       
   245 
       
   246         this.requestedLeaseTime = requestedLeaseTime;
       
   247         if (leaseScheduler == null)
       
   248             leaseScheduler = defaultLeaseScheduler();
       
   249         leaseRenewer = new LeaseRenewer(leaseScheduler, renewLease);
       
   250 
       
   251         if (logger.traceOn()) {
       
   252             logger.trace("init", "New EventClient: "+clientId);
       
   253         }
       
   254     }
       
   255 
       
   256     private static ScheduledExecutorService defaultLeaseScheduler() {
       
   257         // The default lease scheduler uses a ScheduledThreadPoolExecutor
       
   258         // with a maximum of 20 threads.  This means that if you have many
       
   259         // EventClient instances and some of them get blocked (because of an
       
   260         // unresponsive network, for example), then even the instances that
       
   261         // are connected to responsive servers may have their leases expire.
       
   262         // XXX check if the above is true and possibly fix.
       
   263         PerThreadGroupPool.Create<ScheduledThreadPoolExecutor> create =
       
   264                 new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
       
   265             public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
       
   266                 ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
       
   267                         "JMX EventClient lease renewer %d");
       
   268                 ScheduledThreadPoolExecutor executor =
       
   269                         new ScheduledThreadPoolExecutor(20, daemonThreadFactory);
       
   270                 executor.setKeepAliveTime(1, TimeUnit.SECONDS);
       
   271                 executor.allowCoreThreadTimeOut(true);
       
   272                 if (setRemoveOnCancelPolicy != null) {
       
   273                     try {
       
   274                         setRemoveOnCancelPolicy.invoke(executor, true);
       
   275                     } catch (Exception e) {
       
   276                         logger.trace("setRemoveOnCancelPolicy", e);
       
   277                     }
       
   278                 }
       
   279                 // By default, a ScheduledThreadPoolExecutor will keep jobs
       
   280                 // in its queue even after they have been cancelled.  They
       
   281                 // will only be removed when their scheduled time arrives.
       
   282                 // Since the job references the LeaseRenewer which references
       
   283                 // this EventClient, this can lead to a moderately large number
       
   284                 // of objects remaining referenced until the renewal time
       
   285                 // arrives.  Hence the above call, which removes the job from
       
   286                 // the queue as soon as it is cancelled.  Since the call is
       
   287                 // new with JDK 7, we invoke it via reflection to make it
       
   288                 // easier to use this code on JDK 6.
       
   289                 return executor;
       
   290             }
       
   291         };
       
   292         return leaseRenewerThreadPool.getThreadPoolExecutor(create);
       
   293     }
       
   294 
       
   295     private static final Method setRemoveOnCancelPolicy;
       
   296     static {
       
   297         Method m;
       
   298         try {
       
   299             m = ScheduledThreadPoolExecutor.class.getMethod(
       
   300                     "setRemoveOnCancelPolicy", boolean.class);
       
   301         } catch (Exception e) {
       
   302             m = null;
       
   303         }
       
   304         setRemoveOnCancelPolicy = m;
       
   305     }
       
   306 
       
   307     /**
       
   308      * <p>Closes this EventClient, removes all listeners and stops receiving
       
   309      * notifications.</p>
       
   310      *
       
   311      * <p>This method calls {@link
       
   312      * EventClientDelegateMBean#removeClient(String)} and {@link
       
   313      * EventRelay#stop}.  Both operations occur even if one of them
       
   314      * throws an {@code IOException}.
       
   315      *
       
   316      * @throws IOException if an I/O error occurs when communicating with
       
   317      * {@link EventClientDelegateMBean}, or if {@link EventRelay#stop}
       
   318      * throws an {@code IOException}.
       
   319      */
       
   320     public void close() throws IOException {
       
   321         if (logger.traceOn()) {
       
   322             logger.trace("close", clientId);
       
   323         }
       
   324 
       
   325         synchronized(listenerInfoMap) {
       
   326             if (closed) {
       
   327                 return;
       
   328             }
       
   329 
       
   330             closed = true;
       
   331             listenerInfoMap.clear();
       
   332         }
       
   333 
       
   334         if (leaseRenewer != null)
       
   335             leaseRenewer.close();
       
   336 
       
   337         IOException ioe = null;
       
   338         try {
       
   339             eventRelay.stop();
       
   340         } catch (IOException e) {
       
   341             ioe = e;
       
   342             logger.debug("close", "EventRelay.stop", e);
       
   343         }
       
   344 
       
   345         try {
       
   346             eventClientDelegate.removeClient(clientId);
       
   347         } catch (Exception e) {
       
   348             if (e instanceof IOException)
       
   349                 ioe = (IOException) e;
       
   350             else
       
   351                 ioe = new IOException(e);
       
   352             logger.debug("close",
       
   353                     "Got exception when removing "+clientId, e);
       
   354         }
       
   355 
       
   356         if (ioe != null)
       
   357             throw ioe;
       
   358     }
       
   359 
       
   360     /**
       
   361      * <p>Determine if this {@code EventClient} is closed.</p>
       
   362      *
       
   363      * @return True if the {@code EventClient} is closed.
       
   364      */
       
   365     public boolean closed() {
       
   366         return closed;
       
   367     }
       
   368 
       
   369     /**
       
   370      * <p>Return the {@link EventRelay} associated with this
       
   371      * {@code EventClient}.</p>
       
   372      *
       
   373      * @return The {@link EventRelay} object used.
       
   374      */
       
   375     public EventRelay getEventRelay() {
       
   376         return eventRelay;
       
   377     }
       
   378 
       
   379     /**
       
   380      * <p>Return the lease time that this {@code EventClient} requests
       
   381      * on every lease renewal.</p>
       
   382      *
       
   383      * @return The requested lease time.
       
   384      *
       
   385      * @see EventClientDelegateMBean#lease
       
   386      */
       
   387     public long getRequestedLeaseTime() {
       
   388         return requestedLeaseTime;
       
   389     }
       
   390 
       
   391     /**
       
   392      * @see javax.management.MBeanServerConnection#addNotificationListener(
       
   393      * ObjectName, NotificationListener, NotificationFilter, Object).
       
   394      */
       
   395     public void addNotificationListener(ObjectName name,
       
   396             NotificationListener listener,
       
   397             NotificationFilter filter,
       
   398             Object handback)
       
   399             throws InstanceNotFoundException, IOException {
       
   400         if (logger.traceOn()) {
       
   401             logger.trace("addNotificationListener", "");
       
   402         }
       
   403 
       
   404         checkState();
       
   405 
       
   406         Integer listenerId;
       
   407         try {
       
   408             listenerId =
       
   409                     eventClientDelegate.addListener(clientId, name, filter);
       
   410         } catch (EventClientNotFoundException ecnfe) {
       
   411             final IOException ioe = new IOException(ecnfe.getMessage());
       
   412             ioe.initCause(ecnfe);
       
   413             throw ioe;
       
   414         }
       
   415 
       
   416         synchronized(listenerInfoMap) {
       
   417             listenerInfoMap.put(listenerId,  new ListenerInfo(
       
   418                     name,
       
   419                     listener,
       
   420                     filter,
       
   421                     handback,
       
   422                     false));
       
   423         }
       
   424 
       
   425         startListening();
       
   426     }
       
   427 
       
   428     /**
       
   429      * @see javax.management.MBeanServerConnection#removeNotificationListener(
       
   430      * ObjectName, NotificationListener).
       
   431      */
       
   432     public void removeNotificationListener(ObjectName name,
       
   433             NotificationListener listener)
       
   434             throws InstanceNotFoundException,
       
   435             ListenerNotFoundException,
       
   436             IOException {
       
   437         if (logger.traceOn()) {
       
   438             logger.trace("removeNotificationListener", "");
       
   439         }
       
   440         checkState();
       
   441 
       
   442         for (Integer id : getListenerInfo(name, listener, false)) {
       
   443             removeListener(id);
       
   444         }
       
   445     }
       
   446 
       
   447     /**
       
   448      * @see javax.management.MBeanServerConnection#removeNotificationListener(
       
   449      * ObjectName, NotificationListener, NotificationFilter, Object).
       
   450      */
       
   451     public void removeNotificationListener(ObjectName name,
       
   452             NotificationListener listener,
       
   453             NotificationFilter filter,
       
   454             Object handback)
       
   455             throws InstanceNotFoundException,
       
   456             ListenerNotFoundException,
       
   457             IOException {
       
   458         if (logger.traceOn()) {
       
   459             logger.trace("removeNotificationListener", "with all arguments.");
       
   460         }
       
   461         checkState();
       
   462         final Integer listenerId =
       
   463                 getListenerInfo(name, listener, filter, handback, false);
       
   464 
       
   465         removeListener(listenerId);
       
   466     }
       
   467 
       
   468     /**
       
   469      * @see javax.management.event.EventConsumer#unsubscribe(
       
   470      * ObjectName, NotificationListener).
       
   471      */
       
   472     public void unsubscribe(ObjectName name,
       
   473             NotificationListener listener)
       
   474             throws ListenerNotFoundException, IOException {
       
   475         if (logger.traceOn()) {
       
   476             logger.trace("unsubscribe", "");
       
   477         }
       
   478         checkState();
       
   479         final Integer listenerId =
       
   480                 getMatchedListenerInfo(name, listener, true);
       
   481 
       
   482         synchronized(listenerInfoMap) {
       
   483             if (listenerInfoMap.remove(listenerId) == null) {
       
   484                 throw new ListenerNotFoundException();
       
   485             }
       
   486         }
       
   487 
       
   488         stopListening();
       
   489 
       
   490         try {
       
   491             eventClientDelegate.removeListenerOrSubscriber(clientId, listenerId);
       
   492         } catch (InstanceNotFoundException e) {
       
   493             logger.trace("unsubscribe", "removeSubscriber", e);
       
   494         } catch (EventClientNotFoundException cnfe) {
       
   495             logger.trace("unsubscribe", "removeSubscriber", cnfe);
       
   496         }
       
   497     }
       
   498 
       
   499     /**
       
   500      * @see javax.management.event.EventConsumer#subscribe(
       
   501      * ObjectName, NotificationListener, NotificationFilter, Object).
       
   502      */
       
   503     public void subscribe(ObjectName name,
       
   504             NotificationListener listener,
       
   505             NotificationFilter filter,
       
   506             Object handback) throws IOException {
       
   507         if (logger.traceOn()) {
       
   508             logger.trace("subscribe", "");
       
   509         }
       
   510 
       
   511         checkState();
       
   512 
       
   513         Integer listenerId;
       
   514         try {
       
   515             listenerId =
       
   516                     eventClientDelegate.addSubscriber(clientId, name, filter);
       
   517         } catch (EventClientNotFoundException ecnfe) {
       
   518             final IOException ioe = new IOException(ecnfe.getMessage());
       
   519             ioe.initCause(ecnfe);
       
   520             throw ioe;
       
   521         }
       
   522 
       
   523         synchronized(listenerInfoMap) {
       
   524             listenerInfoMap.put(listenerId,  new ListenerInfo(
       
   525                     name,
       
   526                     listener,
       
   527                     filter,
       
   528                     handback,
       
   529                     true));
       
   530         }
       
   531 
       
   532         startListening();
       
   533     }
       
   534 
       
   535     /**
       
   536      * <p>Adds a set of listeners to the remote MBeanServer.  This method can
       
   537      * be used to copy the listeners from one {@code EventClient} to another.</p>
       
   538      *
       
   539      * <p>A listener is represented by a {@link ListenerInfo} object. The listener
       
   540      * is added by calling {@link #subscribe(ObjectName,
       
   541      * NotificationListener, NotificationFilter, Object)} if the method
       
   542      * {@link ListenerInfo#isSubscription() isSubscription}
       
   543      * returns {@code true}; otherwise it is added by calling
       
   544      * {@link #addNotificationListener(ObjectName, NotificationListener,
       
   545      * NotificationFilter, Object)}.</p>
       
   546      *
       
   547      * <P>The method returns the listeners which were added successfully. The
       
   548      * elements in the returned collection are a subset of the elements in
       
   549      * {@code listeners}. If all listeners were added successfully, the two
       
   550      * collections are the same. If no listener was added successfully, the
       
   551      * returned collection is empty.</p>
       
   552      *
       
   553      * @param listeners the listeners to add.
       
   554      *
       
   555      * @return The listeners that were added successfully.
       
   556      *
       
   557      * @throws IOException If an I/O error occurs.
       
   558      *
       
   559      * @see #getListeners()
       
   560      */
       
   561     public Collection<ListenerInfo> addListeners(Collection<ListenerInfo> listeners)
       
   562     throws IOException {
       
   563         if (logger.traceOn()) {
       
   564             logger.trace("addListeners", "");
       
   565         }
       
   566 
       
   567         checkState();
       
   568 
       
   569         if (listeners == null || listeners.isEmpty())
       
   570             return Collections.emptySet();
       
   571 
       
   572         final List<ListenerInfo> list = new ArrayList<ListenerInfo>();
       
   573         for (ListenerInfo l : listeners) {
       
   574             try {
       
   575                 if (l.isSubscription()) {
       
   576                     subscribe(l.getObjectName(),
       
   577                             l.getListener(),
       
   578                             l.getFilter(),
       
   579                             l.getHandback());
       
   580                 } else {
       
   581                     addNotificationListener(l.getObjectName(),
       
   582                             l.getListener(),
       
   583                             l.getFilter(),
       
   584                             l.getHandback());
       
   585                 }
       
   586 
       
   587                 list.add(l);
       
   588             } catch (Exception e) {
       
   589                 if (logger.traceOn()) {
       
   590                     logger.trace("addListeners", "failed to add: "+l, e);
       
   591                 }
       
   592             }
       
   593         }
       
   594 
       
   595         return list;
       
   596     }
       
   597 
       
   598     /**
       
   599      * <p>Returns the collection of listeners that have been added through
       
   600      * this {@code EventClient} and not subsequently removed.  The returned
       
   601      * collection contains one entry for every listener added with
       
   602      * {@link #addNotificationListener addNotificationListener} or
       
   603      * {@link #subscribe subscribe} and not subsequently removed with
       
   604      * {@link #removeNotificationListener removeNotificationListener} or
       
   605      * {@link #unsubscribe unsubscribe}, respectively.</p>
       
   606      *
       
   607      * @return A collection of listener information. Empty if there are no
       
   608      * current listeners or if this {@code EventClient} has been {@linkplain
       
   609      * #close closed}.
       
   610      *
       
   611      * @see #addListeners
       
   612      */
       
   613     public Collection<ListenerInfo> getListeners() {
       
   614         if (logger.traceOn()) {
       
   615             logger.trace("getListeners", "");
       
   616         }
       
   617 
       
   618         synchronized(listenerInfoMap) {
       
   619             return Collections.unmodifiableCollection(listenerInfoMap.values());
       
   620         }
       
   621     }
       
   622 
       
   623     /**
       
   624      * Adds a listener to receive the {@code EventClient} notifications specified in
       
   625      * {@link #getEventClientNotificationInfo}.
       
   626      *
       
   627      * @param listener A listener to receive {@code EventClient} notifications.
       
   628      * @param filter A filter to select which notifications are to be delivered
       
   629      * to the listener, or {@code null} if all notifications are to be delivered.
       
   630      * @param handback An object to be given to the listener along with each
       
   631      * notification. Can be null.
       
   632      * @throws NullPointerException If listener is null.
       
   633      * @see #removeEventClientListener
       
   634      */
       
   635     public void addEventClientListener(NotificationListener listener,
       
   636             NotificationFilter filter,
       
   637             Object handback) {
       
   638         if (logger.traceOn()) {
       
   639             logger.trace("addEventClientListener", "");
       
   640         }
       
   641         broadcaster.addNotificationListener(listener, filter, handback);
       
   642     }
       
   643 
       
   644     /**
       
   645      * Removes a listener added to receive {@code EventClient} notifications specified in
       
   646      * {@link #getEventClientNotificationInfo}.
       
   647      *
       
   648      * @param listener A listener to receive {@code EventClient} notifications.
       
   649      * @throws NullPointerException If listener is null.
       
   650      * @throws ListenerNotFoundException If the listener is not added to
       
   651      * this {@code EventClient}.
       
   652      */
       
   653     public void removeEventClientListener(NotificationListener listener)
       
   654     throws ListenerNotFoundException {
       
   655         if (logger.traceOn()) {
       
   656             logger.trace("removeEventClientListener", "");
       
   657         }
       
   658         broadcaster.removeNotificationListener(listener);
       
   659     }
       
   660 
       
   661     /**
       
   662      * <p>Get the types of notification that an {@code EventClient} can send
       
   663      * to listeners added with {@link #addEventClientListener
       
   664      * addEventClientListener}.</p>
       
   665      *
       
   666      * @return Types of notification emitted by this {@code EventClient}.
       
   667      *
       
   668      * @see #FAILED
       
   669      * @see #NONFATAL
       
   670      * @see #NOTIFS_LOST
       
   671      */
       
   672     public MBeanNotificationInfo[] getEventClientNotificationInfo() {
       
   673         return myInfo.clone();
       
   674     }
       
   675 
       
   676     private static boolean match(ListenerInfo li,
       
   677             ObjectName name,
       
   678             NotificationListener listener,
       
   679             boolean subscribed) {
       
   680         return li.getObjectName().equals(name) &&
       
   681                 li.getListener() == listener &&
       
   682                 li.isSubscription() == subscribed;
       
   683     }
       
   684 
       
   685     private static boolean match(ListenerInfo li,
       
   686             ObjectName name,
       
   687             NotificationListener listener,
       
   688             NotificationFilter filter,
       
   689             Object handback,
       
   690             boolean subscribed) {
       
   691         return li.getObjectName().equals(name) &&
       
   692                 li.getFilter() == filter &&
       
   693                 li.getListener() == listener &&
       
   694                 li.getHandback() == handback &&
       
   695                 li.isSubscription() == subscribed;
       
   696     }
       
   697 
       
   698 // ---------------------------------------------------
       
   699 // private classes
       
   700 // ---------------------------------------------------
       
   701     private class DispatchingJob extends RepeatedSingletonJob {
       
   702         public DispatchingJob() {
       
   703             super(distributingExecutor);
       
   704         }
       
   705 
       
   706         public boolean isSuspended() {
       
   707             return closed || buffer.size() == 0;
       
   708         }
       
   709 
       
   710         public void task() {
       
   711             TargetedNotification[] tns ;
       
   712             int lost = 0;
       
   713 
       
   714             synchronized(buffer) {
       
   715                 tns = buffer.removeNotifs();
       
   716                 lost = buffer.removeLost();
       
   717             }
       
   718 
       
   719             if ((tns == null || tns.length == 0)
       
   720             && lost == 0) {
       
   721                 return;
       
   722             }
       
   723 
       
   724             // forwarding
       
   725             if (tns != null && tns.length > 0) {
       
   726                 if (logger.traceOn()) {
       
   727                     logger.trace("DispatchingJob-task",
       
   728                             "Forwarding: "+tns.length);
       
   729                 }
       
   730                 for (TargetedNotification tn : tns) {
       
   731                     final ListenerInfo li = listenerInfoMap.get(tn.getListenerID());
       
   732                     try {
       
   733                         li.getListener().handleNotification(tn.getNotification(),
       
   734                                 li.getHandback());
       
   735                     } catch (Exception e) {
       
   736                         logger.fine(
       
   737                                 "DispatchingJob.task", "listener got exception", e);
       
   738                     }
       
   739                 }
       
   740             }
       
   741 
       
   742             if (lost > 0) {
       
   743                 if (logger.traceOn()) {
       
   744                     logger.trace("DispatchingJob-task",
       
   745                             "lost: "+lost);
       
   746                 }
       
   747                 final Notification n = new Notification(NOTIFS_LOST,
       
   748                         EventClient.this,
       
   749                         myNotifCounter.getAndIncrement(),
       
   750                         System.currentTimeMillis(),
       
   751                         "Lost notifications.");
       
   752                 n.setUserData(new Long(lost));
       
   753                 broadcaster.sendNotification(n);
       
   754             }
       
   755         }
       
   756     }
       
   757 
       
   758 
       
   759     private class EventReceiverImpl implements EventReceiver {
       
   760         public void receive(NotificationResult nr) {
       
   761             if (logger.traceOn()) {
       
   762                 logger.trace("MyEventReceiver-receive", "");
       
   763             }
       
   764 
       
   765             synchronized(buffer) {
       
   766                 buffer.addNotifs(nr);
       
   767 
       
   768                 dispatchingJob.resume();
       
   769             }
       
   770         }
       
   771 
       
   772         public void failed(Throwable t) {
       
   773             if (logger.traceOn()) {
       
   774                 logger.trace("MyEventReceiver-failed", "", t);
       
   775             }
       
   776             final Notification n = new Notification(FAILED,
       
   777                     this,
       
   778                     myNotifCounter.getAndIncrement(),
       
   779                     System.currentTimeMillis());
       
   780             n.setSource(t);
       
   781             broadcaster.sendNotification(n);
       
   782         }
       
   783 
       
   784         public void nonFatal(Exception e) {
       
   785             if (logger.traceOn()) {
       
   786                 logger.trace("MyEventReceiver-nonFatal", "", e);
       
   787             }
       
   788 
       
   789             final Notification n = new Notification(NONFATAL,
       
   790                     this,
       
   791                     myNotifCounter.getAndIncrement(),
       
   792                     System.currentTimeMillis());
       
   793             n.setSource(e);
       
   794             broadcaster.sendNotification(n);
       
   795         }
       
   796     }
       
   797 
       
   798 // ----------------------------------------------------
       
   799 // private class
       
   800 // ----------------------------------------------------
       
   801 
       
   802 
       
   803 // ----------------------------------------------------
       
   804 // private methods
       
   805 // ----------------------------------------------------
       
   806     private Integer getListenerInfo(ObjectName name,
       
   807             NotificationListener listener,
       
   808             NotificationFilter filter,
       
   809             Object handback,
       
   810             boolean subscribed) throws ListenerNotFoundException {
       
   811 
       
   812         synchronized(listenerInfoMap) {
       
   813             for (Map.Entry<Integer, ListenerInfo> entry :
       
   814                     listenerInfoMap.entrySet()) {
       
   815                 ListenerInfo li = entry.getValue();
       
   816                 if (match(li, name, listener, filter, handback, subscribed)) {
       
   817                     return entry.getKey();
       
   818                 }
       
   819             }
       
   820         }
       
   821 
       
   822         throw new ListenerNotFoundException();
       
   823     }
       
   824 
       
   825     private Integer getMatchedListenerInfo(ObjectName name,
       
   826             NotificationListener listener,
       
   827             boolean subscribed) throws ListenerNotFoundException {
       
   828 
       
   829         synchronized(listenerInfoMap) {
       
   830             for (Map.Entry<Integer, ListenerInfo> entry :
       
   831                     listenerInfoMap.entrySet()) {
       
   832                 ListenerInfo li = entry.getValue();
       
   833                 if (li.getObjectName().equals(name) &&
       
   834                         li.getListener() == listener &&
       
   835                         li.isSubscription() == subscribed) {
       
   836                     return entry.getKey();
       
   837                 }
       
   838             }
       
   839         }
       
   840 
       
   841         throw new ListenerNotFoundException();
       
   842     }
       
   843 
       
   844     private Collection<Integer> getListenerInfo(ObjectName name,
       
   845             NotificationListener listener,
       
   846             boolean subscribed) throws ListenerNotFoundException {
       
   847 
       
   848         final ArrayList<Integer> ids = new ArrayList<Integer>();
       
   849         synchronized(listenerInfoMap) {
       
   850             for (Map.Entry<Integer, ListenerInfo> entry :
       
   851                     listenerInfoMap.entrySet()) {
       
   852                 ListenerInfo li = entry.getValue();
       
   853                 if (match(li, name, listener, subscribed)) {
       
   854                     ids.add(entry.getKey());
       
   855                 }
       
   856             }
       
   857         }
       
   858 
       
   859         if (ids.isEmpty()) {
       
   860             throw new ListenerNotFoundException();
       
   861         }
       
   862 
       
   863         return ids;
       
   864     }
       
   865 
       
   866     private void checkState() throws IOException {
       
   867         synchronized(listenerInfoMap) {
       
   868             if (closed) {
       
   869                 throw new IOException("Ended!");
       
   870             }
       
   871         }
       
   872     }
       
   873 
       
   874     private void startListening() throws IOException {
       
   875         synchronized(listenerInfoMap) {
       
   876             if (!startedListening && listenerInfoMap.size() > 0) {
       
   877                 eventRelay.setEventReceiver(myReceiver);
       
   878             }
       
   879 
       
   880             startedListening = true;
       
   881 
       
   882             if (logger.traceOn()) {
       
   883                 logger.trace("startListening", "listening");
       
   884             }
       
   885         }
       
   886     }
       
   887 
       
   888     private void stopListening() throws IOException {
       
   889         synchronized(listenerInfoMap) {
       
   890             if (listenerInfoMap.size() == 0 && startedListening) {
       
   891                 eventRelay.setEventReceiver(null);
       
   892 
       
   893                 startedListening = false;
       
   894 
       
   895                 if (logger.traceOn()) {
       
   896                     logger.trace("stopListening", "non listening");
       
   897                 }
       
   898             }
       
   899         }
       
   900     }
       
   901 
       
   902     private void removeListener(Integer id)
       
   903     throws InstanceNotFoundException,
       
   904             ListenerNotFoundException,
       
   905             IOException {
       
   906         synchronized(listenerInfoMap) {
       
   907             if (listenerInfoMap.remove(id) == null) {
       
   908                 throw new ListenerNotFoundException();
       
   909             }
       
   910 
       
   911             stopListening();
       
   912         }
       
   913 
       
   914         try {
       
   915             eventClientDelegate.removeListenerOrSubscriber(clientId, id);
       
   916         } catch (EventClientNotFoundException cnfe) {
       
   917             logger.trace("removeListener", "ecd.removeListener", cnfe);
       
   918         }
       
   919     }
       
   920 
       
   921 
       
   922 // ----------------------------------------------------
       
   923 // private variables
       
   924 // ----------------------------------------------------
       
   925     private static final ClassLogger logger =
       
   926             new ClassLogger("javax.management.event", "EventClient");
       
   927 
       
   928     private final Executor distributingExecutor;
       
   929     private final EventClientDelegateMBean eventClientDelegate;
       
   930     private final EventRelay eventRelay;
       
   931     private volatile String clientId = null;
       
   932     private final long requestedLeaseTime;
       
   933 
       
   934     private final ReceiverBuffer buffer = new ReceiverBuffer();
       
   935 
       
   936     private final EventReceiverImpl myReceiver =
       
   937             new EventReceiverImpl();
       
   938     private final DispatchingJob dispatchingJob;
       
   939 
       
   940     private final HashMap<Integer, ListenerInfo> listenerInfoMap =
       
   941             new HashMap<Integer, ListenerInfo>();
       
   942 
       
   943     private volatile boolean closed = false;
       
   944 
       
   945     private volatile boolean startedListening = false;
       
   946 
       
   947     // Could change synchronization here. But at worst a race will mean
       
   948     // sequence numbers are not contiguous, which may not matter much.
       
   949     private final AtomicLong myNotifCounter = new AtomicLong();
       
   950 
       
   951     private final static MBeanNotificationInfo[] myInfo =
       
   952             new MBeanNotificationInfo[] {
       
   953         new MBeanNotificationInfo(
       
   954                 new String[] {FAILED, NONFATAL, NOTIFS_LOST},
       
   955                 Notification.class.getName(),
       
   956                 "Notifications that can be sent to a listener added with " +
       
   957                 "EventClient.addEventClientListener")};
       
   958 
       
   959     private final NotificationBroadcasterSupport broadcaster =
       
   960             new NotificationBroadcasterSupport();
       
   961 
       
   962     private final static Executor callerExecutor = new Executor() {
       
   963         // DirectExecutor using caller thread
       
   964         public void execute(Runnable r) {
       
   965             r.run();
       
   966         }
       
   967     };
       
   968 
       
   969     private static void checkInit(final MBeanServerConnection conn,
       
   970             final ObjectName delegateName)
       
   971             throws IOException {
       
   972         if (conn == null) {
       
   973             throw new IllegalArgumentException("No connection specified");
       
   974         }
       
   975         if (delegateName != null &&
       
   976                 (!conn.isRegistered(delegateName))) {
       
   977             throw new IllegalArgumentException(
       
   978                     delegateName +
       
   979                     ": not found");
       
   980         }
       
   981         if (delegateName == null &&
       
   982                 (!conn.isRegistered(
       
   983                 EventClientDelegate.OBJECT_NAME))) {
       
   984             throw new IllegalArgumentException(
       
   985                     EventClientDelegate.OBJECT_NAME +
       
   986                     ": not found");
       
   987         }
       
   988     }
       
   989 
       
   990 // ----------------------------------------------------
       
   991 // private event lease issues
       
   992 // ----------------------------------------------------
       
   993     private Callable<Long> renewLease = new Callable<Long>() {
       
   994         public Long call() throws IOException, EventClientNotFoundException {
       
   995             return eventClientDelegate.lease(clientId, requestedLeaseTime);
       
   996         }
       
   997     };
       
   998 
       
   999     private final LeaseRenewer leaseRenewer;
       
  1000 
       
  1001 // ------------------------------------------------------------------------
       
  1002     /**
       
  1003      * Constructs an {@code MBeanServerConnection} that uses an {@code EventClient} object,
       
  1004      * if the underlying connection has an {@link EventClientDelegateMBean}.
       
  1005      * <P> The {@code EventClient} object creates a default
       
  1006      * {@link FetchingEventRelay} object to
       
  1007      * receive notifications forwarded by the {@link EventClientDelegateMBean}.
       
  1008      * The {@link EventClientDelegateMBean} it works with is the
       
  1009      * default one registered with the ObjectName
       
  1010      * {@link EventClientDelegate#OBJECT_NAME
       
  1011      * OBJECT_NAME}.
       
  1012      * The thread from the {@link FetchingEventRelay} object that fetches the
       
  1013      * notifications is also used to distribute them.
       
  1014      *
       
  1015      * @param conn An {@link MBeanServerConnection} object used to communicate
       
  1016      * with an {@link EventClientDelegateMBean}.
       
  1017      * @throws IllegalArgumentException If the value of {@code conn} is null,
       
  1018      *         or the default {@link EventClientDelegateMBean} is not registered.
       
  1019      * @throws IOException If an I/O error occurs.
       
  1020      */
       
  1021     public static MBeanServerConnection getEventClientConnection(
       
  1022             final MBeanServerConnection conn)
       
  1023             throws IOException {
       
  1024         return getEventClientConnection(conn, null);
       
  1025     }
       
  1026 
       
  1027     /**
       
  1028      * Constructs an MBeanServerConnection that uses an {@code EventClient}
       
  1029      * object with a user-specific {@link EventRelay}
       
  1030      * object.
       
  1031      * <P>
       
  1032      * The {@link EventClientDelegateMBean} which it works with is the
       
  1033      * default one registered with the ObjectName
       
  1034      * {@link EventClientDelegate#OBJECT_NAME
       
  1035      * OBJECT_NAME}
       
  1036      * The thread that calls {@link EventReceiver#receive
       
  1037      * EventReceiver.receive} from the {@link EventRelay} object is used
       
  1038      * to distribute notifications to their listeners.
       
  1039      *
       
  1040      * @param conn An {@link MBeanServerConnection} object used to communicate
       
  1041      * with an {@link EventClientDelegateMBean}.
       
  1042      * @param eventRelay A user-specific object used to receive notifications
       
  1043      * forwarded by the {@link EventClientDelegateMBean}. If null, the default
       
  1044      * {@link FetchingEventRelay} object is used.
       
  1045      * @throws IllegalArgumentException If the value of {@code conn} is null,
       
  1046      *         or the default {@link EventClientDelegateMBean} is not registered.
       
  1047      * @throws IOException If an I/O error occurs.
       
  1048      */
       
  1049     public static MBeanServerConnection getEventClientConnection(
       
  1050             final MBeanServerConnection conn,
       
  1051             final EventRelay eventRelay)
       
  1052             throws IOException {
       
  1053 
       
  1054         if (newEventConn == null) {
       
  1055             throw new IllegalArgumentException(
       
  1056                     "Class not found: EventClientConnection");
       
  1057         }
       
  1058 
       
  1059         checkInit(conn,null);
       
  1060         final Callable<EventClient> factory = new Callable<EventClient>() {
       
  1061             final public EventClient call() throws Exception {
       
  1062                 EventClientDelegateMBean ecd = EventClientDelegate.getProxy(conn);
       
  1063                 return new EventClient(ecd, eventRelay, null, null,
       
  1064                         DEFAULT_REQUESTED_LEASE_TIME);
       
  1065             }
       
  1066         };
       
  1067 
       
  1068         try {
       
  1069             return (MBeanServerConnection)newEventConn.invoke(null,
       
  1070                     conn, factory);
       
  1071         } catch (Exception e) {
       
  1072             throw new IllegalArgumentException(e);
       
  1073         }
       
  1074     }
       
  1075 
       
  1076     private static Method newEventConn = null;
       
  1077     static {
       
  1078         try {
       
  1079             Class<?> c = Class.forName(
       
  1080                     "com.sun.jmx.remote.util.EventClientConnection",
       
  1081                     false, Thread.currentThread().getContextClassLoader());
       
  1082             newEventConn = c.getMethod("getEventConnectionFor",
       
  1083                     MBeanServerConnection.class, Callable.class);
       
  1084         } catch (Exception e) {
       
  1085             // OK: we're running in a subset of our classes
       
  1086         }
       
  1087     }
       
  1088 
       
  1089     /**
       
  1090      * <p>Get the client id of this {@code EventClient} in the
       
  1091      * {@link EventClientDelegateMBean}.
       
  1092      *
       
  1093      * @return the client id.
       
  1094      *
       
  1095      * @see EventClientDelegateMBean#addClient(String, Object[], String[])
       
  1096      * EventClientDelegateMBean.addClient
       
  1097      */
       
  1098     public String getClientId() {
       
  1099         return clientId;
       
  1100     }
       
  1101 
       
  1102     private static final PerThreadGroupPool<ScheduledThreadPoolExecutor>
       
  1103             leaseRenewerThreadPool = PerThreadGroupPool.make();
       
  1104 }