1 /* |
|
2 * Copyright 2007-2008 Sun Microsystems, Inc. All Rights Reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Sun designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Sun in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
|
22 * CA 95054 USA or visit www.sun.com if you need additional information or |
|
23 * have any questions. |
|
24 */ |
|
25 |
|
26 package javax.management.event; |
|
27 |
|
28 import com.sun.jmx.event.DaemonThreadFactory; |
|
29 import com.sun.jmx.event.LeaseRenewer; |
|
30 import com.sun.jmx.event.ReceiverBuffer; |
|
31 import com.sun.jmx.event.RepeatedSingletonJob; |
|
32 import com.sun.jmx.mbeanserver.PerThreadGroupPool; |
|
33 import com.sun.jmx.remote.util.ClassLogger; |
|
34 |
|
35 import java.io.IOException; |
|
36 import java.lang.reflect.Method; |
|
37 import java.util.ArrayList; |
|
38 import java.util.Collection; |
|
39 import java.util.Collections; |
|
40 import java.util.HashMap; |
|
41 import java.util.List; |
|
42 import java.util.Map; |
|
43 import java.util.concurrent.Callable; |
|
44 import java.util.concurrent.Executor; |
|
45 |
|
46 import java.util.concurrent.ScheduledExecutorService; |
|
47 import java.util.concurrent.ScheduledThreadPoolExecutor; |
|
48 import java.util.concurrent.ThreadFactory; |
|
49 import java.util.concurrent.TimeUnit; |
|
50 import java.util.concurrent.atomic.AtomicLong; |
|
51 import javax.management.InstanceNotFoundException; |
|
52 import javax.management.ListenerNotFoundException; |
|
53 import javax.management.MBeanNotificationInfo; |
|
54 import javax.management.MBeanServerConnection; |
|
55 import javax.management.Notification; |
|
56 import javax.management.NotificationBroadcasterSupport; |
|
57 import javax.management.NotificationFilter; |
|
58 import javax.management.NotificationListener; |
|
59 import javax.management.ObjectName; |
|
60 import javax.management.remote.NotificationResult; |
|
61 import javax.management.remote.TargetedNotification; |
|
62 |
|
63 /** |
|
64 * <p>This class is used to manage its notification listeners on the client |
|
65 * side in the same way as on the MBean server side. This class needs to work |
|
66 * with an {@link EventClientDelegateMBean} on the server side.</p> |
|
67 * |
|
68 * <P>A user can specify an {@link EventRelay} object to specify how to receive |
|
69 * notifications forwarded by the {@link EventClientDelegateMBean}. By default, |
|
70 * the class {@link FetchingEventRelay} is used.</p> |
|
71 * |
|
72 * <p>A user can specify an {@link java.util.concurrent.Executor Executor} |
|
73 * to distribute notifications to local listeners. If no executor is |
|
74 * specified, the thread in the {@link EventRelay} which calls {@link |
|
75 * EventReceiver#receive EventReceiver.receive} will be reused to distribute |
|
76 * the notifications (in other words, to call the {@link |
|
77 * NotificationListener#handleNotification handleNotification} method of the |
|
78 * appropriate listeners). It is useful to make a separate thread do this |
|
79 * distribution in some cases. For example, if network communication is slow, |
|
80 * the forwarding thread can concentrate on communication while, locally, |
|
81 * the distributing thread distributes the received notifications. Another |
|
82 * usage is to share a thread pool between many clients, for scalability. |
|
83 * Note, though, that if the {@code Executor} can create more than one thread |
|
84 * then it is possible that listeners will see notifications in a different |
|
85 * order from the order in which they were sent.</p> |
|
86 * |
|
87 * <p>An object of this class sends notifications to listeners added with |
|
88 * {@link #addEventClientListener}. The {@linkplain Notification#getType() |
|
89 * type} of each such notification is one of {@link #FAILED}, {@link #NONFATAL}, |
|
90 * or {@link #NOTIFS_LOST}.</p> |
|
91 * |
|
92 * @since JMX 2.0 |
|
93 */ |
|
94 public class EventClient implements EventConsumer, NotificationManager { |
|
95 |
|
96 /** |
|
97 * <p>A notification string type used by an {@code EventClient} object |
|
98 * to inform a listener added by {@link #addEventClientListener} that |
|
99 * it failed to get notifications from a remote server, and that it is |
|
100 * possible that no more notifications will be delivered.</p> |
|
101 * |
|
102 * @see #addEventClientListener |
|
103 * @see EventReceiver#failed |
|
104 */ |
|
105 public static final String FAILED = "jmx.event.service.failed"; |
|
106 |
|
107 /** |
|
108 * <p>Reports that an unexpected exception has been received by the {@link |
|
109 * EventRelay} object but that it is non-fatal. For example, a notification |
|
110 * received is not serializable or its class is not found.</p> |
|
111 * |
|
112 * @see #addEventClientListener |
|
113 * @see EventReceiver#nonFatal |
|
114 */ |
|
115 public static final String NONFATAL = "jmx.event.service.nonfatal"; |
|
116 |
|
117 /** |
|
118 * <p>A notification string type used by an {@code EventClient} object |
|
119 * to inform a listener added by {@link #addEventClientListener |
|
120 * addEventClientListener} that it has detected that notifications have |
|
121 * been lost. The {@link Notification#getUserData() userData} of the |
|
122 * notification is a Long which is an upper bound on the number of lost |
|
123 * notifications that have just been detected.</p> |
|
124 * |
|
125 * @see #addEventClientListener |
|
126 */ |
|
127 public static final String NOTIFS_LOST = "jmx.event.service.notifs.lost"; |
|
128 |
|
129 /** |
|
130 * The default lease time that EventClient instances will request, in |
|
131 * milliseconds. This value is {@value}. |
|
132 * |
|
133 * @see EventClientDelegateMBean#lease |
|
134 */ |
|
135 public static final long DEFAULT_REQUESTED_LEASE_TIME = 300000; |
|
136 |
|
137 /** |
|
138 * <p>Constructs a default {@code EventClient} object.</p> |
|
139 * |
|
140 * <p>This object creates a {@link FetchingEventRelay} object to |
|
141 * receive notifications forwarded by the {@link EventClientDelegateMBean}. |
|
142 * The {@link EventClientDelegateMBean} that it works with is the |
|
143 * one registered with the {@linkplain EventClientDelegate#OBJECT_NAME |
|
144 * default ObjectName}. The thread from the {@link FetchingEventRelay} |
|
145 * object that fetches the notifications is also used to distribute them. |
|
146 * |
|
147 * @param conn An {@link MBeanServerConnection} object used to communicate |
|
148 * with an {@link EventClientDelegateMBean} MBean. |
|
149 * |
|
150 * @throws IllegalArgumentException If {@code conn} is null. |
|
151 * @throws IOException If an I/O error occurs when communicating with the |
|
152 * {@code EventClientDelegateMBean}. |
|
153 */ |
|
154 public EventClient(MBeanServerConnection conn) throws IOException { |
|
155 this(EventClientDelegate.getProxy(conn)); |
|
156 } |
|
157 |
|
158 /** |
|
159 * Constructs an {@code EventClient} object with a specified |
|
160 * {@link EventClientDelegateMBean}. |
|
161 * |
|
162 * <p>This object creates a {@link FetchingEventRelay} object to receive |
|
163 * notifications forwarded by the {@link EventClientDelegateMBean}. The |
|
164 * thread from the {@link FetchingEventRelay} object that fetches the |
|
165 * notifications is also used to distribute them. |
|
166 * |
|
167 * @param delegate An {@link EventClientDelegateMBean} object to work with. |
|
168 * |
|
169 * @throws IllegalArgumentException If {@code delegate} is null. |
|
170 * @throws IOException If an I/O error occurs when communicating with the |
|
171 * the {@link EventClientDelegateMBean}. |
|
172 */ |
|
173 public EventClient(EventClientDelegateMBean delegate) |
|
174 throws IOException { |
|
175 this(delegate, null, null, null, DEFAULT_REQUESTED_LEASE_TIME); |
|
176 } |
|
177 |
|
178 /** |
|
179 * Constructs an {@code EventClient} object with the specified |
|
180 * {@link EventClientDelegateMBean}, {@link EventRelay} |
|
181 * object, and distributing thread. |
|
182 * |
|
183 * @param delegate An {@link EventClientDelegateMBean} object to work with. |
|
184 * Usually, this will be a proxy constructed using |
|
185 * {@link EventClientDelegate#getProxy}. |
|
186 * @param eventRelay An object used to receive notifications |
|
187 * forwarded by the {@link EventClientDelegateMBean}. If {@code null}, a |
|
188 * {@link FetchingEventRelay} object will be used. |
|
189 * @param distributingExecutor Used to distribute notifications to local |
|
190 * listeners. Only one job at a time will be submitted to this Executor. |
|
191 * If {@code distributingExecutor} is {@code null}, the thread that calls |
|
192 * {@link EventReceiver#receive EventReceiver.receive} from the {@link |
|
193 * EventRelay} object is used. |
|
194 * @param leaseScheduler An object that will be used to schedule the |
|
195 * periodic {@linkplain EventClientDelegateMBean#lease lease updates}. |
|
196 * If {@code null}, a default scheduler will be used. |
|
197 * @param requestedLeaseTime The lease time used to keep this client alive |
|
198 * in the {@link EventClientDelegateMBean}. A value of zero is equivalent |
|
199 * to the {@linkplain #DEFAULT_REQUESTED_LEASE_TIME default value}. |
|
200 * |
|
201 * @throws IllegalArgumentException If {@code delegate} is null. |
|
202 * @throws IOException If an I/O error occurs when communicating with the |
|
203 * {@link EventClientDelegateMBean}. |
|
204 */ |
|
205 public EventClient(EventClientDelegateMBean delegate, |
|
206 EventRelay eventRelay, |
|
207 Executor distributingExecutor, |
|
208 ScheduledExecutorService leaseScheduler, |
|
209 long requestedLeaseTime) |
|
210 throws IOException { |
|
211 if (delegate == null) { |
|
212 throw new IllegalArgumentException("Null EventClientDelegateMBean"); |
|
213 } |
|
214 |
|
215 if (requestedLeaseTime == 0) |
|
216 requestedLeaseTime = DEFAULT_REQUESTED_LEASE_TIME; |
|
217 else if (requestedLeaseTime < 0) { |
|
218 throw new IllegalArgumentException( |
|
219 "Negative lease time: " + requestedLeaseTime); |
|
220 } |
|
221 |
|
222 eventClientDelegate = delegate; |
|
223 |
|
224 if (eventRelay != null) { |
|
225 this.eventRelay = eventRelay; |
|
226 } else { |
|
227 try { |
|
228 this.eventRelay = new FetchingEventRelay(delegate); |
|
229 } catch (IOException ioe) { |
|
230 throw ioe; |
|
231 } catch (Exception e) { |
|
232 // impossible? |
|
233 final IOException ioee = new IOException(e.toString()); |
|
234 ioee.initCause(e); |
|
235 throw ioee; |
|
236 } |
|
237 } |
|
238 |
|
239 if (distributingExecutor == null) |
|
240 distributingExecutor = callerExecutor; |
|
241 this.distributingExecutor = distributingExecutor; |
|
242 this.dispatchingJob = new DispatchingJob(); |
|
243 |
|
244 clientId = this.eventRelay.getClientId(); |
|
245 |
|
246 this.requestedLeaseTime = requestedLeaseTime; |
|
247 if (leaseScheduler == null) |
|
248 leaseScheduler = defaultLeaseScheduler(); |
|
249 leaseRenewer = new LeaseRenewer(leaseScheduler, renewLease); |
|
250 |
|
251 if (logger.traceOn()) { |
|
252 logger.trace("init", "New EventClient: "+clientId); |
|
253 } |
|
254 } |
|
255 |
|
256 private static ScheduledExecutorService defaultLeaseScheduler() { |
|
257 // The default lease scheduler uses a ScheduledThreadPoolExecutor |
|
258 // with a maximum of 20 threads. This means that if you have many |
|
259 // EventClient instances and some of them get blocked (because of an |
|
260 // unresponsive network, for example), then even the instances that |
|
261 // are connected to responsive servers may have their leases expire. |
|
262 // XXX check if the above is true and possibly fix. |
|
263 PerThreadGroupPool.Create<ScheduledThreadPoolExecutor> create = |
|
264 new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() { |
|
265 public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) { |
|
266 ThreadFactory daemonThreadFactory = new DaemonThreadFactory( |
|
267 "JMX EventClient lease renewer %d"); |
|
268 ScheduledThreadPoolExecutor executor = |
|
269 new ScheduledThreadPoolExecutor(20, daemonThreadFactory); |
|
270 executor.setKeepAliveTime(1, TimeUnit.SECONDS); |
|
271 executor.allowCoreThreadTimeOut(true); |
|
272 if (setRemoveOnCancelPolicy != null) { |
|
273 try { |
|
274 setRemoveOnCancelPolicy.invoke(executor, true); |
|
275 } catch (Exception e) { |
|
276 logger.trace("setRemoveOnCancelPolicy", e); |
|
277 } |
|
278 } |
|
279 // By default, a ScheduledThreadPoolExecutor will keep jobs |
|
280 // in its queue even after they have been cancelled. They |
|
281 // will only be removed when their scheduled time arrives. |
|
282 // Since the job references the LeaseRenewer which references |
|
283 // this EventClient, this can lead to a moderately large number |
|
284 // of objects remaining referenced until the renewal time |
|
285 // arrives. Hence the above call, which removes the job from |
|
286 // the queue as soon as it is cancelled. Since the call is |
|
287 // new with JDK 7, we invoke it via reflection to make it |
|
288 // easier to use this code on JDK 6. |
|
289 return executor; |
|
290 } |
|
291 }; |
|
292 return leaseRenewerThreadPool.getThreadPoolExecutor(create); |
|
293 } |
|
294 |
|
295 private static final Method setRemoveOnCancelPolicy; |
|
296 static { |
|
297 Method m; |
|
298 try { |
|
299 m = ScheduledThreadPoolExecutor.class.getMethod( |
|
300 "setRemoveOnCancelPolicy", boolean.class); |
|
301 } catch (Exception e) { |
|
302 m = null; |
|
303 } |
|
304 setRemoveOnCancelPolicy = m; |
|
305 } |
|
306 |
|
307 /** |
|
308 * <p>Closes this EventClient, removes all listeners and stops receiving |
|
309 * notifications.</p> |
|
310 * |
|
311 * <p>This method calls {@link |
|
312 * EventClientDelegateMBean#removeClient(String)} and {@link |
|
313 * EventRelay#stop}. Both operations occur even if one of them |
|
314 * throws an {@code IOException}. |
|
315 * |
|
316 * @throws IOException if an I/O error occurs when communicating with |
|
317 * {@link EventClientDelegateMBean}, or if {@link EventRelay#stop} |
|
318 * throws an {@code IOException}. |
|
319 */ |
|
320 public void close() throws IOException { |
|
321 if (logger.traceOn()) { |
|
322 logger.trace("close", clientId); |
|
323 } |
|
324 |
|
325 synchronized(listenerInfoMap) { |
|
326 if (closed) { |
|
327 return; |
|
328 } |
|
329 |
|
330 closed = true; |
|
331 listenerInfoMap.clear(); |
|
332 } |
|
333 |
|
334 if (leaseRenewer != null) |
|
335 leaseRenewer.close(); |
|
336 |
|
337 IOException ioe = null; |
|
338 try { |
|
339 eventRelay.stop(); |
|
340 } catch (IOException e) { |
|
341 ioe = e; |
|
342 logger.debug("close", "EventRelay.stop", e); |
|
343 } |
|
344 |
|
345 try { |
|
346 eventClientDelegate.removeClient(clientId); |
|
347 } catch (Exception e) { |
|
348 if (e instanceof IOException) |
|
349 ioe = (IOException) e; |
|
350 else |
|
351 ioe = new IOException(e); |
|
352 logger.debug("close", |
|
353 "Got exception when removing "+clientId, e); |
|
354 } |
|
355 |
|
356 if (ioe != null) |
|
357 throw ioe; |
|
358 } |
|
359 |
|
360 /** |
|
361 * <p>Determine if this {@code EventClient} is closed.</p> |
|
362 * |
|
363 * @return True if the {@code EventClient} is closed. |
|
364 */ |
|
365 public boolean closed() { |
|
366 return closed; |
|
367 } |
|
368 |
|
369 /** |
|
370 * <p>Return the {@link EventRelay} associated with this |
|
371 * {@code EventClient}.</p> |
|
372 * |
|
373 * @return The {@link EventRelay} object used. |
|
374 */ |
|
375 public EventRelay getEventRelay() { |
|
376 return eventRelay; |
|
377 } |
|
378 |
|
379 /** |
|
380 * <p>Return the lease time that this {@code EventClient} requests |
|
381 * on every lease renewal.</p> |
|
382 * |
|
383 * @return The requested lease time. |
|
384 * |
|
385 * @see EventClientDelegateMBean#lease |
|
386 */ |
|
387 public long getRequestedLeaseTime() { |
|
388 return requestedLeaseTime; |
|
389 } |
|
390 |
|
391 /** |
|
392 * @see javax.management.MBeanServerConnection#addNotificationListener( |
|
393 * ObjectName, NotificationListener, NotificationFilter, Object). |
|
394 */ |
|
395 public void addNotificationListener(ObjectName name, |
|
396 NotificationListener listener, |
|
397 NotificationFilter filter, |
|
398 Object handback) |
|
399 throws InstanceNotFoundException, IOException { |
|
400 if (logger.traceOn()) { |
|
401 logger.trace("addNotificationListener", ""); |
|
402 } |
|
403 |
|
404 checkState(); |
|
405 |
|
406 Integer listenerId; |
|
407 try { |
|
408 listenerId = |
|
409 eventClientDelegate.addListener(clientId, name, filter); |
|
410 } catch (EventClientNotFoundException ecnfe) { |
|
411 final IOException ioe = new IOException(ecnfe.getMessage()); |
|
412 ioe.initCause(ecnfe); |
|
413 throw ioe; |
|
414 } |
|
415 |
|
416 synchronized(listenerInfoMap) { |
|
417 listenerInfoMap.put(listenerId, new ListenerInfo( |
|
418 name, |
|
419 listener, |
|
420 filter, |
|
421 handback, |
|
422 false)); |
|
423 } |
|
424 |
|
425 startListening(); |
|
426 } |
|
427 |
|
428 /** |
|
429 * @see javax.management.MBeanServerConnection#removeNotificationListener( |
|
430 * ObjectName, NotificationListener). |
|
431 */ |
|
432 public void removeNotificationListener(ObjectName name, |
|
433 NotificationListener listener) |
|
434 throws InstanceNotFoundException, |
|
435 ListenerNotFoundException, |
|
436 IOException { |
|
437 if (logger.traceOn()) { |
|
438 logger.trace("removeNotificationListener", ""); |
|
439 } |
|
440 checkState(); |
|
441 |
|
442 for (Integer id : getListenerInfo(name, listener, false)) { |
|
443 removeListener(id); |
|
444 } |
|
445 } |
|
446 |
|
447 /** |
|
448 * @see javax.management.MBeanServerConnection#removeNotificationListener( |
|
449 * ObjectName, NotificationListener, NotificationFilter, Object). |
|
450 */ |
|
451 public void removeNotificationListener(ObjectName name, |
|
452 NotificationListener listener, |
|
453 NotificationFilter filter, |
|
454 Object handback) |
|
455 throws InstanceNotFoundException, |
|
456 ListenerNotFoundException, |
|
457 IOException { |
|
458 if (logger.traceOn()) { |
|
459 logger.trace("removeNotificationListener", "with all arguments."); |
|
460 } |
|
461 checkState(); |
|
462 final Integer listenerId = |
|
463 getListenerInfo(name, listener, filter, handback, false); |
|
464 |
|
465 removeListener(listenerId); |
|
466 } |
|
467 |
|
468 /** |
|
469 * @see javax.management.event.EventConsumer#unsubscribe( |
|
470 * ObjectName, NotificationListener). |
|
471 */ |
|
472 public void unsubscribe(ObjectName name, |
|
473 NotificationListener listener) |
|
474 throws ListenerNotFoundException, IOException { |
|
475 if (logger.traceOn()) { |
|
476 logger.trace("unsubscribe", ""); |
|
477 } |
|
478 checkState(); |
|
479 final Integer listenerId = |
|
480 getMatchedListenerInfo(name, listener, true); |
|
481 |
|
482 synchronized(listenerInfoMap) { |
|
483 if (listenerInfoMap.remove(listenerId) == null) { |
|
484 throw new ListenerNotFoundException(); |
|
485 } |
|
486 } |
|
487 |
|
488 stopListening(); |
|
489 |
|
490 try { |
|
491 eventClientDelegate.removeListenerOrSubscriber(clientId, listenerId); |
|
492 } catch (InstanceNotFoundException e) { |
|
493 logger.trace("unsubscribe", "removeSubscriber", e); |
|
494 } catch (EventClientNotFoundException cnfe) { |
|
495 logger.trace("unsubscribe", "removeSubscriber", cnfe); |
|
496 } |
|
497 } |
|
498 |
|
499 /** |
|
500 * @see javax.management.event.EventConsumer#subscribe( |
|
501 * ObjectName, NotificationListener, NotificationFilter, Object). |
|
502 */ |
|
503 public void subscribe(ObjectName name, |
|
504 NotificationListener listener, |
|
505 NotificationFilter filter, |
|
506 Object handback) throws IOException { |
|
507 if (logger.traceOn()) { |
|
508 logger.trace("subscribe", ""); |
|
509 } |
|
510 |
|
511 checkState(); |
|
512 |
|
513 Integer listenerId; |
|
514 try { |
|
515 listenerId = |
|
516 eventClientDelegate.addSubscriber(clientId, name, filter); |
|
517 } catch (EventClientNotFoundException ecnfe) { |
|
518 final IOException ioe = new IOException(ecnfe.getMessage()); |
|
519 ioe.initCause(ecnfe); |
|
520 throw ioe; |
|
521 } |
|
522 |
|
523 synchronized(listenerInfoMap) { |
|
524 listenerInfoMap.put(listenerId, new ListenerInfo( |
|
525 name, |
|
526 listener, |
|
527 filter, |
|
528 handback, |
|
529 true)); |
|
530 } |
|
531 |
|
532 startListening(); |
|
533 } |
|
534 |
|
535 /** |
|
536 * <p>Adds a set of listeners to the remote MBeanServer. This method can |
|
537 * be used to copy the listeners from one {@code EventClient} to another.</p> |
|
538 * |
|
539 * <p>A listener is represented by a {@link ListenerInfo} object. The listener |
|
540 * is added by calling {@link #subscribe(ObjectName, |
|
541 * NotificationListener, NotificationFilter, Object)} if the method |
|
542 * {@link ListenerInfo#isSubscription() isSubscription} |
|
543 * returns {@code true}; otherwise it is added by calling |
|
544 * {@link #addNotificationListener(ObjectName, NotificationListener, |
|
545 * NotificationFilter, Object)}.</p> |
|
546 * |
|
547 * <P>The method returns the listeners which were added successfully. The |
|
548 * elements in the returned collection are a subset of the elements in |
|
549 * {@code listeners}. If all listeners were added successfully, the two |
|
550 * collections are the same. If no listener was added successfully, the |
|
551 * returned collection is empty.</p> |
|
552 * |
|
553 * @param listeners the listeners to add. |
|
554 * |
|
555 * @return The listeners that were added successfully. |
|
556 * |
|
557 * @throws IOException If an I/O error occurs. |
|
558 * |
|
559 * @see #getListeners() |
|
560 */ |
|
561 public Collection<ListenerInfo> addListeners(Collection<ListenerInfo> listeners) |
|
562 throws IOException { |
|
563 if (logger.traceOn()) { |
|
564 logger.trace("addListeners", ""); |
|
565 } |
|
566 |
|
567 checkState(); |
|
568 |
|
569 if (listeners == null || listeners.isEmpty()) |
|
570 return Collections.emptySet(); |
|
571 |
|
572 final List<ListenerInfo> list = new ArrayList<ListenerInfo>(); |
|
573 for (ListenerInfo l : listeners) { |
|
574 try { |
|
575 if (l.isSubscription()) { |
|
576 subscribe(l.getObjectName(), |
|
577 l.getListener(), |
|
578 l.getFilter(), |
|
579 l.getHandback()); |
|
580 } else { |
|
581 addNotificationListener(l.getObjectName(), |
|
582 l.getListener(), |
|
583 l.getFilter(), |
|
584 l.getHandback()); |
|
585 } |
|
586 |
|
587 list.add(l); |
|
588 } catch (Exception e) { |
|
589 if (logger.traceOn()) { |
|
590 logger.trace("addListeners", "failed to add: "+l, e); |
|
591 } |
|
592 } |
|
593 } |
|
594 |
|
595 return list; |
|
596 } |
|
597 |
|
598 /** |
|
599 * <p>Returns the collection of listeners that have been added through |
|
600 * this {@code EventClient} and not subsequently removed. The returned |
|
601 * collection contains one entry for every listener added with |
|
602 * {@link #addNotificationListener addNotificationListener} or |
|
603 * {@link #subscribe subscribe} and not subsequently removed with |
|
604 * {@link #removeNotificationListener removeNotificationListener} or |
|
605 * {@link #unsubscribe unsubscribe}, respectively.</p> |
|
606 * |
|
607 * @return A collection of listener information. Empty if there are no |
|
608 * current listeners or if this {@code EventClient} has been {@linkplain |
|
609 * #close closed}. |
|
610 * |
|
611 * @see #addListeners |
|
612 */ |
|
613 public Collection<ListenerInfo> getListeners() { |
|
614 if (logger.traceOn()) { |
|
615 logger.trace("getListeners", ""); |
|
616 } |
|
617 |
|
618 synchronized(listenerInfoMap) { |
|
619 return Collections.unmodifiableCollection(listenerInfoMap.values()); |
|
620 } |
|
621 } |
|
622 |
|
623 /** |
|
624 * Adds a listener to receive the {@code EventClient} notifications specified in |
|
625 * {@link #getEventClientNotificationInfo}. |
|
626 * |
|
627 * @param listener A listener to receive {@code EventClient} notifications. |
|
628 * @param filter A filter to select which notifications are to be delivered |
|
629 * to the listener, or {@code null} if all notifications are to be delivered. |
|
630 * @param handback An object to be given to the listener along with each |
|
631 * notification. Can be null. |
|
632 * @throws NullPointerException If listener is null. |
|
633 * @see #removeEventClientListener |
|
634 */ |
|
635 public void addEventClientListener(NotificationListener listener, |
|
636 NotificationFilter filter, |
|
637 Object handback) { |
|
638 if (logger.traceOn()) { |
|
639 logger.trace("addEventClientListener", ""); |
|
640 } |
|
641 broadcaster.addNotificationListener(listener, filter, handback); |
|
642 } |
|
643 |
|
644 /** |
|
645 * Removes a listener added to receive {@code EventClient} notifications specified in |
|
646 * {@link #getEventClientNotificationInfo}. |
|
647 * |
|
648 * @param listener A listener to receive {@code EventClient} notifications. |
|
649 * @throws NullPointerException If listener is null. |
|
650 * @throws ListenerNotFoundException If the listener is not added to |
|
651 * this {@code EventClient}. |
|
652 */ |
|
653 public void removeEventClientListener(NotificationListener listener) |
|
654 throws ListenerNotFoundException { |
|
655 if (logger.traceOn()) { |
|
656 logger.trace("removeEventClientListener", ""); |
|
657 } |
|
658 broadcaster.removeNotificationListener(listener); |
|
659 } |
|
660 |
|
661 /** |
|
662 * <p>Get the types of notification that an {@code EventClient} can send |
|
663 * to listeners added with {@link #addEventClientListener |
|
664 * addEventClientListener}.</p> |
|
665 * |
|
666 * @return Types of notification emitted by this {@code EventClient}. |
|
667 * |
|
668 * @see #FAILED |
|
669 * @see #NONFATAL |
|
670 * @see #NOTIFS_LOST |
|
671 */ |
|
672 public MBeanNotificationInfo[] getEventClientNotificationInfo() { |
|
673 return myInfo.clone(); |
|
674 } |
|
675 |
|
676 private static boolean match(ListenerInfo li, |
|
677 ObjectName name, |
|
678 NotificationListener listener, |
|
679 boolean subscribed) { |
|
680 return li.getObjectName().equals(name) && |
|
681 li.getListener() == listener && |
|
682 li.isSubscription() == subscribed; |
|
683 } |
|
684 |
|
685 private static boolean match(ListenerInfo li, |
|
686 ObjectName name, |
|
687 NotificationListener listener, |
|
688 NotificationFilter filter, |
|
689 Object handback, |
|
690 boolean subscribed) { |
|
691 return li.getObjectName().equals(name) && |
|
692 li.getFilter() == filter && |
|
693 li.getListener() == listener && |
|
694 li.getHandback() == handback && |
|
695 li.isSubscription() == subscribed; |
|
696 } |
|
697 |
|
698 // --------------------------------------------------- |
|
699 // private classes |
|
700 // --------------------------------------------------- |
|
701 private class DispatchingJob extends RepeatedSingletonJob { |
|
702 public DispatchingJob() { |
|
703 super(distributingExecutor); |
|
704 } |
|
705 |
|
706 public boolean isSuspended() { |
|
707 return closed || buffer.size() == 0; |
|
708 } |
|
709 |
|
710 public void task() { |
|
711 TargetedNotification[] tns ; |
|
712 int lost = 0; |
|
713 |
|
714 synchronized(buffer) { |
|
715 tns = buffer.removeNotifs(); |
|
716 lost = buffer.removeLost(); |
|
717 } |
|
718 |
|
719 if ((tns == null || tns.length == 0) |
|
720 && lost == 0) { |
|
721 return; |
|
722 } |
|
723 |
|
724 // forwarding |
|
725 if (tns != null && tns.length > 0) { |
|
726 if (logger.traceOn()) { |
|
727 logger.trace("DispatchingJob-task", |
|
728 "Forwarding: "+tns.length); |
|
729 } |
|
730 for (TargetedNotification tn : tns) { |
|
731 final ListenerInfo li = listenerInfoMap.get(tn.getListenerID()); |
|
732 try { |
|
733 li.getListener().handleNotification(tn.getNotification(), |
|
734 li.getHandback()); |
|
735 } catch (Exception e) { |
|
736 logger.fine( |
|
737 "DispatchingJob.task", "listener got exception", e); |
|
738 } |
|
739 } |
|
740 } |
|
741 |
|
742 if (lost > 0) { |
|
743 if (logger.traceOn()) { |
|
744 logger.trace("DispatchingJob-task", |
|
745 "lost: "+lost); |
|
746 } |
|
747 final Notification n = new Notification(NOTIFS_LOST, |
|
748 EventClient.this, |
|
749 myNotifCounter.getAndIncrement(), |
|
750 System.currentTimeMillis(), |
|
751 "Lost notifications."); |
|
752 n.setUserData(new Long(lost)); |
|
753 broadcaster.sendNotification(n); |
|
754 } |
|
755 } |
|
756 } |
|
757 |
|
758 |
|
759 private class EventReceiverImpl implements EventReceiver { |
|
760 public void receive(NotificationResult nr) { |
|
761 if (logger.traceOn()) { |
|
762 logger.trace("MyEventReceiver-receive", ""); |
|
763 } |
|
764 |
|
765 synchronized(buffer) { |
|
766 buffer.addNotifs(nr); |
|
767 |
|
768 dispatchingJob.resume(); |
|
769 } |
|
770 } |
|
771 |
|
772 public void failed(Throwable t) { |
|
773 if (logger.traceOn()) { |
|
774 logger.trace("MyEventReceiver-failed", "", t); |
|
775 } |
|
776 final Notification n = new Notification(FAILED, |
|
777 this, |
|
778 myNotifCounter.getAndIncrement(), |
|
779 System.currentTimeMillis()); |
|
780 n.setSource(t); |
|
781 broadcaster.sendNotification(n); |
|
782 } |
|
783 |
|
784 public void nonFatal(Exception e) { |
|
785 if (logger.traceOn()) { |
|
786 logger.trace("MyEventReceiver-nonFatal", "", e); |
|
787 } |
|
788 |
|
789 final Notification n = new Notification(NONFATAL, |
|
790 this, |
|
791 myNotifCounter.getAndIncrement(), |
|
792 System.currentTimeMillis()); |
|
793 n.setSource(e); |
|
794 broadcaster.sendNotification(n); |
|
795 } |
|
796 } |
|
797 |
|
798 // ---------------------------------------------------- |
|
799 // private class |
|
800 // ---------------------------------------------------- |
|
801 |
|
802 |
|
803 // ---------------------------------------------------- |
|
804 // private methods |
|
805 // ---------------------------------------------------- |
|
806 private Integer getListenerInfo(ObjectName name, |
|
807 NotificationListener listener, |
|
808 NotificationFilter filter, |
|
809 Object handback, |
|
810 boolean subscribed) throws ListenerNotFoundException { |
|
811 |
|
812 synchronized(listenerInfoMap) { |
|
813 for (Map.Entry<Integer, ListenerInfo> entry : |
|
814 listenerInfoMap.entrySet()) { |
|
815 ListenerInfo li = entry.getValue(); |
|
816 if (match(li, name, listener, filter, handback, subscribed)) { |
|
817 return entry.getKey(); |
|
818 } |
|
819 } |
|
820 } |
|
821 |
|
822 throw new ListenerNotFoundException(); |
|
823 } |
|
824 |
|
825 private Integer getMatchedListenerInfo(ObjectName name, |
|
826 NotificationListener listener, |
|
827 boolean subscribed) throws ListenerNotFoundException { |
|
828 |
|
829 synchronized(listenerInfoMap) { |
|
830 for (Map.Entry<Integer, ListenerInfo> entry : |
|
831 listenerInfoMap.entrySet()) { |
|
832 ListenerInfo li = entry.getValue(); |
|
833 if (li.getObjectName().equals(name) && |
|
834 li.getListener() == listener && |
|
835 li.isSubscription() == subscribed) { |
|
836 return entry.getKey(); |
|
837 } |
|
838 } |
|
839 } |
|
840 |
|
841 throw new ListenerNotFoundException(); |
|
842 } |
|
843 |
|
844 private Collection<Integer> getListenerInfo(ObjectName name, |
|
845 NotificationListener listener, |
|
846 boolean subscribed) throws ListenerNotFoundException { |
|
847 |
|
848 final ArrayList<Integer> ids = new ArrayList<Integer>(); |
|
849 synchronized(listenerInfoMap) { |
|
850 for (Map.Entry<Integer, ListenerInfo> entry : |
|
851 listenerInfoMap.entrySet()) { |
|
852 ListenerInfo li = entry.getValue(); |
|
853 if (match(li, name, listener, subscribed)) { |
|
854 ids.add(entry.getKey()); |
|
855 } |
|
856 } |
|
857 } |
|
858 |
|
859 if (ids.isEmpty()) { |
|
860 throw new ListenerNotFoundException(); |
|
861 } |
|
862 |
|
863 return ids; |
|
864 } |
|
865 |
|
866 private void checkState() throws IOException { |
|
867 synchronized(listenerInfoMap) { |
|
868 if (closed) { |
|
869 throw new IOException("Ended!"); |
|
870 } |
|
871 } |
|
872 } |
|
873 |
|
874 private void startListening() throws IOException { |
|
875 synchronized(listenerInfoMap) { |
|
876 if (!startedListening && listenerInfoMap.size() > 0) { |
|
877 eventRelay.setEventReceiver(myReceiver); |
|
878 } |
|
879 |
|
880 startedListening = true; |
|
881 |
|
882 if (logger.traceOn()) { |
|
883 logger.trace("startListening", "listening"); |
|
884 } |
|
885 } |
|
886 } |
|
887 |
|
888 private void stopListening() throws IOException { |
|
889 synchronized(listenerInfoMap) { |
|
890 if (listenerInfoMap.size() == 0 && startedListening) { |
|
891 eventRelay.setEventReceiver(null); |
|
892 |
|
893 startedListening = false; |
|
894 |
|
895 if (logger.traceOn()) { |
|
896 logger.trace("stopListening", "non listening"); |
|
897 } |
|
898 } |
|
899 } |
|
900 } |
|
901 |
|
902 private void removeListener(Integer id) |
|
903 throws InstanceNotFoundException, |
|
904 ListenerNotFoundException, |
|
905 IOException { |
|
906 synchronized(listenerInfoMap) { |
|
907 if (listenerInfoMap.remove(id) == null) { |
|
908 throw new ListenerNotFoundException(); |
|
909 } |
|
910 |
|
911 stopListening(); |
|
912 } |
|
913 |
|
914 try { |
|
915 eventClientDelegate.removeListenerOrSubscriber(clientId, id); |
|
916 } catch (EventClientNotFoundException cnfe) { |
|
917 logger.trace("removeListener", "ecd.removeListener", cnfe); |
|
918 } |
|
919 } |
|
920 |
|
921 |
|
922 // ---------------------------------------------------- |
|
923 // private variables |
|
924 // ---------------------------------------------------- |
|
925 private static final ClassLogger logger = |
|
926 new ClassLogger("javax.management.event", "EventClient"); |
|
927 |
|
928 private final Executor distributingExecutor; |
|
929 private final EventClientDelegateMBean eventClientDelegate; |
|
930 private final EventRelay eventRelay; |
|
931 private volatile String clientId = null; |
|
932 private final long requestedLeaseTime; |
|
933 |
|
934 private final ReceiverBuffer buffer = new ReceiverBuffer(); |
|
935 |
|
936 private final EventReceiverImpl myReceiver = |
|
937 new EventReceiverImpl(); |
|
938 private final DispatchingJob dispatchingJob; |
|
939 |
|
940 private final HashMap<Integer, ListenerInfo> listenerInfoMap = |
|
941 new HashMap<Integer, ListenerInfo>(); |
|
942 |
|
943 private volatile boolean closed = false; |
|
944 |
|
945 private volatile boolean startedListening = false; |
|
946 |
|
947 // Could change synchronization here. But at worst a race will mean |
|
948 // sequence numbers are not contiguous, which may not matter much. |
|
949 private final AtomicLong myNotifCounter = new AtomicLong(); |
|
950 |
|
951 private final static MBeanNotificationInfo[] myInfo = |
|
952 new MBeanNotificationInfo[] { |
|
953 new MBeanNotificationInfo( |
|
954 new String[] {FAILED, NONFATAL, NOTIFS_LOST}, |
|
955 Notification.class.getName(), |
|
956 "Notifications that can be sent to a listener added with " + |
|
957 "EventClient.addEventClientListener")}; |
|
958 |
|
959 private final NotificationBroadcasterSupport broadcaster = |
|
960 new NotificationBroadcasterSupport(); |
|
961 |
|
962 private final static Executor callerExecutor = new Executor() { |
|
963 // DirectExecutor using caller thread |
|
964 public void execute(Runnable r) { |
|
965 r.run(); |
|
966 } |
|
967 }; |
|
968 |
|
969 private static void checkInit(final MBeanServerConnection conn, |
|
970 final ObjectName delegateName) |
|
971 throws IOException { |
|
972 if (conn == null) { |
|
973 throw new IllegalArgumentException("No connection specified"); |
|
974 } |
|
975 if (delegateName != null && |
|
976 (!conn.isRegistered(delegateName))) { |
|
977 throw new IllegalArgumentException( |
|
978 delegateName + |
|
979 ": not found"); |
|
980 } |
|
981 if (delegateName == null && |
|
982 (!conn.isRegistered( |
|
983 EventClientDelegate.OBJECT_NAME))) { |
|
984 throw new IllegalArgumentException( |
|
985 EventClientDelegate.OBJECT_NAME + |
|
986 ": not found"); |
|
987 } |
|
988 } |
|
989 |
|
990 // ---------------------------------------------------- |
|
991 // private event lease issues |
|
992 // ---------------------------------------------------- |
|
993 private Callable<Long> renewLease = new Callable<Long>() { |
|
994 public Long call() throws IOException, EventClientNotFoundException { |
|
995 return eventClientDelegate.lease(clientId, requestedLeaseTime); |
|
996 } |
|
997 }; |
|
998 |
|
999 private final LeaseRenewer leaseRenewer; |
|
1000 |
|
1001 // ------------------------------------------------------------------------ |
|
1002 /** |
|
1003 * Constructs an {@code MBeanServerConnection} that uses an {@code EventClient} object, |
|
1004 * if the underlying connection has an {@link EventClientDelegateMBean}. |
|
1005 * <P> The {@code EventClient} object creates a default |
|
1006 * {@link FetchingEventRelay} object to |
|
1007 * receive notifications forwarded by the {@link EventClientDelegateMBean}. |
|
1008 * The {@link EventClientDelegateMBean} it works with is the |
|
1009 * default one registered with the ObjectName |
|
1010 * {@link EventClientDelegate#OBJECT_NAME |
|
1011 * OBJECT_NAME}. |
|
1012 * The thread from the {@link FetchingEventRelay} object that fetches the |
|
1013 * notifications is also used to distribute them. |
|
1014 * |
|
1015 * @param conn An {@link MBeanServerConnection} object used to communicate |
|
1016 * with an {@link EventClientDelegateMBean}. |
|
1017 * @throws IllegalArgumentException If the value of {@code conn} is null, |
|
1018 * or the default {@link EventClientDelegateMBean} is not registered. |
|
1019 * @throws IOException If an I/O error occurs. |
|
1020 */ |
|
1021 public static MBeanServerConnection getEventClientConnection( |
|
1022 final MBeanServerConnection conn) |
|
1023 throws IOException { |
|
1024 return getEventClientConnection(conn, null); |
|
1025 } |
|
1026 |
|
1027 /** |
|
1028 * Constructs an MBeanServerConnection that uses an {@code EventClient} |
|
1029 * object with a user-specific {@link EventRelay} |
|
1030 * object. |
|
1031 * <P> |
|
1032 * The {@link EventClientDelegateMBean} which it works with is the |
|
1033 * default one registered with the ObjectName |
|
1034 * {@link EventClientDelegate#OBJECT_NAME |
|
1035 * OBJECT_NAME} |
|
1036 * The thread that calls {@link EventReceiver#receive |
|
1037 * EventReceiver.receive} from the {@link EventRelay} object is used |
|
1038 * to distribute notifications to their listeners. |
|
1039 * |
|
1040 * @param conn An {@link MBeanServerConnection} object used to communicate |
|
1041 * with an {@link EventClientDelegateMBean}. |
|
1042 * @param eventRelay A user-specific object used to receive notifications |
|
1043 * forwarded by the {@link EventClientDelegateMBean}. If null, the default |
|
1044 * {@link FetchingEventRelay} object is used. |
|
1045 * @throws IllegalArgumentException If the value of {@code conn} is null, |
|
1046 * or the default {@link EventClientDelegateMBean} is not registered. |
|
1047 * @throws IOException If an I/O error occurs. |
|
1048 */ |
|
1049 public static MBeanServerConnection getEventClientConnection( |
|
1050 final MBeanServerConnection conn, |
|
1051 final EventRelay eventRelay) |
|
1052 throws IOException { |
|
1053 |
|
1054 if (newEventConn == null) { |
|
1055 throw new IllegalArgumentException( |
|
1056 "Class not found: EventClientConnection"); |
|
1057 } |
|
1058 |
|
1059 checkInit(conn,null); |
|
1060 final Callable<EventClient> factory = new Callable<EventClient>() { |
|
1061 final public EventClient call() throws Exception { |
|
1062 EventClientDelegateMBean ecd = EventClientDelegate.getProxy(conn); |
|
1063 return new EventClient(ecd, eventRelay, null, null, |
|
1064 DEFAULT_REQUESTED_LEASE_TIME); |
|
1065 } |
|
1066 }; |
|
1067 |
|
1068 try { |
|
1069 return (MBeanServerConnection)newEventConn.invoke(null, |
|
1070 conn, factory); |
|
1071 } catch (Exception e) { |
|
1072 throw new IllegalArgumentException(e); |
|
1073 } |
|
1074 } |
|
1075 |
|
1076 private static Method newEventConn = null; |
|
1077 static { |
|
1078 try { |
|
1079 Class<?> c = Class.forName( |
|
1080 "com.sun.jmx.remote.util.EventClientConnection", |
|
1081 false, Thread.currentThread().getContextClassLoader()); |
|
1082 newEventConn = c.getMethod("getEventConnectionFor", |
|
1083 MBeanServerConnection.class, Callable.class); |
|
1084 } catch (Exception e) { |
|
1085 // OK: we're running in a subset of our classes |
|
1086 } |
|
1087 } |
|
1088 |
|
1089 /** |
|
1090 * <p>Get the client id of this {@code EventClient} in the |
|
1091 * {@link EventClientDelegateMBean}. |
|
1092 * |
|
1093 * @return the client id. |
|
1094 * |
|
1095 * @see EventClientDelegateMBean#addClient(String, Object[], String[]) |
|
1096 * EventClientDelegateMBean.addClient |
|
1097 */ |
|
1098 public String getClientId() { |
|
1099 return clientId; |
|
1100 } |
|
1101 |
|
1102 private static final PerThreadGroupPool<ScheduledThreadPoolExecutor> |
|
1103 leaseRenewerThreadPool = PerThreadGroupPool.make(); |
|
1104 } |
|