29 import com.sun.jmx.event.RepeatedSingletonJob; |
29 import com.sun.jmx.event.RepeatedSingletonJob; |
30 import com.sun.jmx.remote.util.ClassLogger; |
30 import com.sun.jmx.remote.util.ClassLogger; |
31 import java.io.IOException; |
31 import java.io.IOException; |
32 import java.io.NotSerializableException; |
32 import java.io.NotSerializableException; |
33 import java.util.concurrent.Executor; |
33 import java.util.concurrent.Executor; |
34 import java.util.concurrent.Executors; |
34 import java.util.concurrent.ExecutorService; |
35 import java.util.concurrent.RejectedExecutionException; |
35 import java.util.concurrent.ScheduledThreadPoolExecutor; |
36 import java.util.concurrent.ScheduledExecutorService; |
|
37 import java.util.concurrent.ScheduledFuture; |
|
38 import java.util.concurrent.ThreadFactory; |
36 import java.util.concurrent.ThreadFactory; |
39 import java.util.concurrent.TimeUnit; |
37 import java.util.concurrent.TimeUnit; |
40 import javax.management.MBeanException; |
38 import javax.management.MBeanException; |
41 import javax.management.remote.NotificationResult; |
39 import javax.management.remote.NotificationResult; |
42 |
40 |
213 this.delegate = delegate; |
211 this.delegate = delegate; |
214 this.timeout = timeout; |
212 this.timeout = timeout; |
215 this.maxNotifs = maxNotifs; |
213 this.maxNotifs = maxNotifs; |
216 |
214 |
217 if (executor == null) { |
215 if (executor == null) { |
218 executor = Executors.newSingleThreadScheduledExecutor( |
216 ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, |
219 daemonThreadFactory); |
217 daemonThreadFactory); |
220 } |
218 stpe.setKeepAliveTime(1, TimeUnit.SECONDS); |
|
219 stpe.allowCoreThreadTimeOut(true); |
|
220 executor = stpe; |
|
221 this.defaultExecutor = stpe; |
|
222 } else |
|
223 this.defaultExecutor = null; |
221 this.executor = executor; |
224 this.executor = executor; |
222 if (executor instanceof ScheduledExecutorService) |
|
223 leaseScheduler = (ScheduledExecutorService) executor; |
|
224 else { |
|
225 leaseScheduler = Executors.newSingleThreadScheduledExecutor( |
|
226 daemonThreadFactory); |
|
227 } |
|
228 |
225 |
229 startSequenceNumber = 0; |
226 startSequenceNumber = 0; |
230 fetchingJob = new MyJob(); |
227 fetchingJob = new MyJob(); |
231 } |
228 } |
232 |
229 |
233 public void setEventReceiver(EventReceiver eventReceiver) { |
230 public synchronized void setEventReceiver(EventReceiver eventReceiver) { |
234 if (logger.traceOn()) { |
231 if (logger.traceOn()) { |
235 logger.trace("setEventReceiver", ""+eventReceiver); |
232 logger.trace("setEventReceiver", ""+eventReceiver); |
236 } |
233 } |
237 |
234 |
238 EventReceiver old = this.eventReceiver; |
235 EventReceiver old = this.eventReceiver; |
239 synchronized(fetchingJob) { |
236 this.eventReceiver = eventReceiver; |
240 this.eventReceiver = eventReceiver; |
237 if (old == null && eventReceiver != null) |
241 if (old == null && eventReceiver != null) |
238 fetchingJob.resume(); |
242 fetchingJob.resume(); |
|
243 } |
|
244 } |
239 } |
245 |
240 |
246 public String getClientId() { |
241 public String getClientId() { |
247 return clientId; |
242 return clientId; |
248 } |
243 } |
249 |
244 |
250 public void stop() { |
245 public synchronized void stop() { |
251 if (logger.traceOn()) { |
246 if (logger.traceOn()) { |
252 logger.trace("stop", ""); |
247 logger.trace("stop", ""); |
253 } |
248 } |
254 synchronized(fetchingJob) { |
249 if (stopped) { |
255 if (stopped) { |
250 return; |
256 return; |
251 } |
257 } |
252 |
258 |
253 stopped = true; |
259 stopped = true; |
254 clientId = null; |
260 clientId = null; |
255 if (defaultExecutor != null) |
261 } |
256 defaultExecutor.shutdown(); |
262 } |
257 } |
263 |
258 |
264 private class MyJob extends RepeatedSingletonJob { |
259 private class MyJob extends RepeatedSingletonJob { |
265 public MyJob() { |
260 public MyJob() { |
266 super(executor); |
261 super(executor); |
370 private long startSequenceNumber = 0; |
365 private long startSequenceNumber = 0; |
371 private EventReceiver eventReceiver = null; |
366 private EventReceiver eventReceiver = null; |
372 private final EventClientDelegateMBean delegate; |
367 private final EventClientDelegateMBean delegate; |
373 private String clientId; |
368 private String clientId; |
374 private boolean stopped = false; |
369 private boolean stopped = false; |
375 private volatile ScheduledFuture<?> leaseRenewalFuture; |
|
376 |
370 |
377 private final Executor executor; |
371 private final Executor executor; |
378 private final ScheduledExecutorService leaseScheduler; |
372 private final ExecutorService defaultExecutor; |
379 private final MyJob fetchingJob; |
373 private final MyJob fetchingJob; |
380 |
374 |
381 private final long timeout; |
375 private final long timeout; |
382 private final int maxNotifs; |
376 private final int maxNotifs; |
383 |
377 |
384 private static final ClassLogger logger = |
378 private static final ClassLogger logger = |
385 new ClassLogger("javax.management.event", |
379 new ClassLogger("javax.management.event", |
386 "FetchingEventRelay"); |
380 "FetchingEventRelay"); |
387 private static final ThreadFactory daemonThreadFactory = |
381 private static final ThreadFactory daemonThreadFactory = |
388 new DaemonThreadFactory("FetchingEventRelay-executor"); |
382 new DaemonThreadFactory("JMX FetchingEventRelay executor %d"); |
389 } |
383 } |