64 public static final class StreamConfiguration { |
64 public static final class StreamConfiguration { |
65 private static final Runnable[] NO_ACTIONS = new Runnable[0]; |
65 private static final Runnable[] NO_ACTIONS = new Runnable[0]; |
66 |
66 |
67 private Runnable[] flushActions = NO_ACTIONS; |
67 private Runnable[] flushActions = NO_ACTIONS; |
68 private Runnable[] closeActions = NO_ACTIONS; |
68 private Runnable[] closeActions = NO_ACTIONS; |
|
69 private Runnable[] errorActions = NO_ACTIONS; |
|
70 |
69 private EventDispatcher[] dispatchers = NO_DISPATCHERS; |
71 private EventDispatcher[] dispatchers = NO_DISPATCHERS; |
70 private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; |
72 private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; |
|
73 private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>(); |
|
74 private boolean changedConfiguration = false; |
71 private boolean closed = false; |
75 private boolean closed = false; |
72 private boolean reuse = true; |
76 private boolean reuse = true; |
73 private boolean ordered = true; |
77 private boolean ordered = true; |
74 private Instant startTime = null; |
78 private Instant startTime = null; |
75 private Instant endTime = null; |
79 private Instant endTime = null; |
76 private boolean started = false; |
80 private boolean started = false; |
77 private long startNanos = 0; |
81 private long startNanos = 0; |
78 private long endNanos = Long.MAX_VALUE; |
82 private long endNanos = Long.MAX_VALUE; |
79 private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>(); |
|
80 private boolean changed = false; |
|
81 |
83 |
82 public StreamConfiguration(StreamConfiguration configuration) { |
84 public StreamConfiguration(StreamConfiguration configuration) { |
83 this.flushActions = configuration.flushActions; |
85 this.flushActions = configuration.flushActions; |
84 this.closeActions = configuration.closeActions; |
86 this.closeActions = configuration.closeActions; |
85 this.dispatchers = configuration.dispatchers; |
87 this.dispatchers = configuration.dispatchers; |
120 final public StreamConfiguration addCloseAction(Runnable action) { |
122 final public StreamConfiguration addCloseAction(Runnable action) { |
121 closeActions = add(closeActions, action); |
123 closeActions = add(closeActions, action); |
122 return this; |
124 return this; |
123 } |
125 } |
124 |
126 |
|
127 public StreamConfiguration addErrorAction(Runnable action) { |
|
128 errorActions = add(errorActions, action); |
|
129 return this; |
|
130 } |
|
131 |
125 final public StreamConfiguration setClosed(boolean closed) { |
132 final public StreamConfiguration setClosed(boolean closed) { |
126 this.closed = closed; |
133 this.closed = closed; |
127 changed = true; |
134 changedConfiguration = true; |
128 return this; |
135 return this; |
129 } |
136 } |
130 |
137 |
131 final public boolean isClosed() { |
138 final public boolean isClosed() { |
132 return closed; |
139 return closed; |
152 } |
159 } |
153 EventDispatcher[] result = list.toArray(new EventDispatcher[0]); |
160 EventDispatcher[] result = list.toArray(new EventDispatcher[0]); |
154 if (modified) { |
161 if (modified) { |
155 eventFilter = buildFilter(result); |
162 eventFilter = buildFilter(result); |
156 dispatcherLookup = new LongMap<>(); |
163 dispatcherLookup = new LongMap<>(); |
157 changed = true; |
164 changedConfiguration = true; |
158 } |
165 } |
159 return result; |
166 return result; |
160 } |
167 } |
161 |
168 |
162 private <T> T[] remove(T[] array, Object action) { |
169 private <T> T[] remove(T[] array, Object action) { |
163 List<T> list = new ArrayList<>(array.length); |
170 List<T> list = new ArrayList<>(array.length); |
164 for (int i = 0; i < array.length; i++) { |
171 for (int i = 0; i < array.length; i++) { |
165 if (array[i] != action) { |
172 if (array[i] != action) { |
166 list.add(array[i]); |
173 list.add(array[i]); |
167 } else { |
174 } else { |
168 changed = true; |
175 changedConfiguration = true; |
169 } |
176 } |
170 } |
177 } |
171 return list.toArray(array); |
178 return list.toArray(array); |
172 } |
179 } |
173 |
180 |
174 private <T> T[] add(T[] array, T object) { |
181 private <T> T[] add(T[] array, T object) { |
175 List<T> list = new ArrayList<>(Arrays.asList(array)); |
182 List<T> list = new ArrayList<>(Arrays.asList(array)); |
176 list.add(object); |
183 list.add(object); |
177 changed = true; |
184 changedConfiguration = true; |
178 return list.toArray(array); |
185 return list.toArray(array); |
179 } |
186 } |
180 |
187 |
181 private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { |
188 private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { |
182 InternalEventFilter ef = new InternalEventFilter(); |
189 InternalEventFilter ef = new InternalEventFilter(); |
190 return ef; |
197 return ef; |
191 } |
198 } |
192 |
199 |
193 final public StreamConfiguration setReuse(boolean reuse) { |
200 final public StreamConfiguration setReuse(boolean reuse) { |
194 this.reuse = reuse; |
201 this.reuse = reuse; |
195 changed = true; |
202 changedConfiguration = true; |
196 return this; |
203 return this; |
197 } |
204 } |
198 |
205 |
199 final public StreamConfiguration setOrdered(boolean ordered) { |
206 final public StreamConfiguration setOrdered(boolean ordered) { |
200 this.ordered = ordered; |
207 this.ordered = ordered; |
201 changed = true; |
208 changedConfiguration = true; |
202 return this; |
209 return this; |
203 } |
210 } |
|
211 |
204 public StreamConfiguration setEndTime(Instant endTime) { |
212 public StreamConfiguration setEndTime(Instant endTime) { |
205 this.endTime = endTime; |
213 this.endTime = endTime; |
206 this.endNanos = Utils.timeToNanos(endTime); |
214 this.endNanos = Utils.timeToNanos(endTime); |
207 changed = true; |
215 changedConfiguration = true; |
208 return this; |
216 return this; |
209 } |
217 } |
210 |
218 |
211 final public StreamConfiguration setStartTime(Instant startTime) { |
219 final public StreamConfiguration setStartTime(Instant startTime) { |
212 this.startTime = startTime; |
220 this.startTime = startTime; |
213 this.startNanos = Utils.timeToNanos(startTime); |
221 this.startNanos = Utils.timeToNanos(startTime); |
214 changed = true; |
222 changedConfiguration = true; |
215 return this; |
223 return this; |
216 } |
224 } |
217 |
225 |
218 final public Instant getStartTime() { |
226 final public Instant getStartTime() { |
219 return startTime; |
227 return startTime; |
227 return started; |
235 return started; |
228 } |
236 } |
229 |
237 |
230 final public StreamConfiguration setStartNanos(long startNanos) { |
238 final public StreamConfiguration setStartNanos(long startNanos) { |
231 this.startNanos = startNanos; |
239 this.startNanos = startNanos; |
232 changed = true; |
240 changedConfiguration = true; |
233 return this; |
241 return this; |
234 } |
242 } |
235 |
243 |
236 final public void setStarted(boolean started) { |
244 final public void setStarted(boolean started) { |
237 this.started = started; |
245 this.started = started; |
238 changed = true; |
246 changedConfiguration = true; |
239 } |
247 } |
240 |
248 |
241 final public boolean hasChanged() { |
249 final public boolean hasChanged() { |
242 return changed; |
250 return changedConfiguration; |
243 } |
251 } |
244 |
252 |
245 final public boolean getReuse() { |
253 final public boolean getReuse() { |
246 return reuse; |
254 return reuse; |
247 } |
255 } |
327 private final AccessControlContext accessControlContext; |
331 private final AccessControlContext accessControlContext; |
328 private final Thread thread; |
332 private final Thread thread; |
329 private final boolean active; |
333 private final boolean active; |
330 protected final Runnable flushOperation = () -> runFlushActions(); |
334 protected final Runnable flushOperation = () -> runFlushActions(); |
331 |
335 |
332 // Updated by updateConfiguration() |
336 // Modified by updateConfiguration() |
333 protected volatile StreamConfiguration configuration = new StreamConfiguration(); |
337 protected volatile StreamConfiguration configuration = new StreamConfiguration(); |
334 |
338 |
335 // Cache the last event type and dispatch. |
339 // Cache the last event type and dispatch. |
336 private EventType lastEventType; |
340 private EventType lastEventType; |
337 private EventDispatcher[] lastEventDispatch; |
341 private EventDispatcher[] lastEventDispatch; |
350 public Void run() { |
354 public Void run() { |
351 execute(); |
355 execute(); |
352 return null; |
356 return null; |
353 } |
357 } |
354 }, accessControlContext); |
358 }, accessControlContext); |
355 |
|
356 } |
359 } |
357 |
360 |
358 private void execute() { |
361 private void execute() { |
359 JVM.getJVM().exclude(Thread.currentThread()); |
362 JVM.getJVM().exclude(Thread.currentThread()); |
360 try { |
363 try { |
361 process(); |
364 process(); |
362 } catch (IOException e) { |
|
363 if (!isClosed()) { |
|
364 logException(e); |
|
365 } |
|
366 } catch (Exception e) { |
365 } catch (Exception e) { |
367 logException(e); |
366 defaultErrorHandler(e); |
368 } finally { |
367 } finally { |
369 Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); |
368 Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); |
370 } |
369 } |
371 } |
370 } |
372 |
371 |
373 private void logException(Exception e) { |
372 public abstract void process() throws Exception; |
374 // FIXME: e.printStackTrace(); for debugging purposes, |
|
375 // remove before before integration |
|
376 e.printStackTrace(); |
|
377 Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); |
|
378 } |
|
379 |
|
380 public abstract void process() throws IOException; |
|
381 |
373 |
382 protected final void clearLastDispatch() { |
374 protected final void clearLastDispatch() { |
383 lastEventDispatch = null; |
375 lastEventDispatch = null; |
384 lastEventType = null; |
376 lastEventType = null; |
385 } |
377 } |
405 } |
397 } |
406 for (int i = 0; i < ret.length; i++) { |
398 for (int i = 0; i < ret.length; i++) { |
407 try { |
399 try { |
408 ret[i].offer(event); |
400 ret[i].offer(event); |
409 } catch (Exception e) { |
401 } catch (Exception e) { |
410 logException(e); |
402 handleError(e); |
411 } |
403 } |
412 } |
404 } |
|
405 } |
|
406 |
|
407 protected final void handleError(Throwable e) { |
|
408 StreamConfiguration c = configuration; |
|
409 if (c.errorActions.length == 0) { |
|
410 defaultErrorHandler(e); |
|
411 return; |
|
412 } |
|
413 for (Runnable r : c.errorActions) { |
|
414 r.run(); |
|
415 } |
|
416 } |
|
417 |
|
418 protected final void defaultErrorHandler(Throwable e) { |
|
419 e.printStackTrace(); |
413 } |
420 } |
414 |
421 |
415 public final void runCloseActions() { |
422 public final void runCloseActions() { |
416 Runnable[] cas = configuration.getCloseActions(); |
423 Runnable[] cas = configuration.getCloseActions(); |
417 for (int i = 0; i < cas.length; i++) { |
424 for (int i = 0; i < cas.length; i++) { |
418 try { |
425 try { |
419 cas[i].run(); |
426 cas[i].run(); |
420 } catch (Exception e) { |
427 } catch (Exception e) { |
421 logException(e); |
428 handleError(e); |
422 } |
429 } |
423 } |
430 } |
424 } |
431 } |
425 |
432 |
426 public final void runFlushActions() { |
433 public final void runFlushActions() { |
427 Runnable[] fas = configuration.getFlushActions(); |
434 Runnable[] fas = configuration.getFlushActions(); |
428 for (int i = 0; i < fas.length; i++) { |
435 for (int i = 0; i < fas.length; i++) { |
429 try { |
436 try { |
430 fas[i].run(); |
437 fas[i].run(); |
431 } catch (Exception e) { |
438 } catch (Exception e) { |
432 logException(e); |
439 handleError(e); |
433 } |
440 } |
434 } |
441 } |
|
442 |
435 } |
443 } |
436 |
444 |
437 // Purpose of synchronizing the following methods is |
445 // Purpose of synchronizing the following methods is |
438 // to serialize changes to the configuration, so only one |
446 // to serialize changes to the configuration, so only one |
439 // thread at a time can change the configuration. |
447 // thread at a time can change the configuration. |
463 |
471 |
464 public final synchronized void addCloseAction(Runnable action) { |
472 public final synchronized void addCloseAction(Runnable action) { |
465 updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); |
473 updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); |
466 } |
474 } |
467 |
475 |
|
476 public final synchronized void addErrorAction(Runnable action) { |
|
477 updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); |
|
478 } |
|
479 |
468 public final synchronized void setClosed(boolean closed) { |
480 public final synchronized void setClosed(boolean closed) { |
469 updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); |
481 updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); |
470 } |
482 } |
471 |
483 |
472 public final synchronized void setReuse(boolean reuse) { |
484 public final synchronized void setReuse(boolean reuse) { |
490 startTime = Instant.EPOCH; |
502 startTime = Instant.EPOCH; |
491 } |
503 } |
492 updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); |
504 updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); |
493 } |
505 } |
494 |
506 |
495 public final void setEndTime(Instant endTime) { |
507 public final synchronized void setEndTime(Instant endTime) { |
496 if (configuration.isStarted()) { |
508 if (configuration.isStarted()) { |
497 throw new IllegalStateException("Stream is already started"); |
509 throw new IllegalStateException("Stream is already started"); |
498 } |
510 } |
499 updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); |
511 updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); |
500 } |
512 } |
501 |
|
502 |
513 |
503 protected boolean updateConfiguration(StreamConfiguration newConfiguration) { |
514 protected boolean updateConfiguration(StreamConfiguration newConfiguration) { |
504 // Changes to the configuration must happen one at a time, so make |
515 if (!Thread.holdsLock(this)) { |
505 // sure that we have the monitor |
516 throw new InternalError("Modification of configuration without proper lock"); |
506 Thread.holdsLock(this); |
517 } |
507 if (newConfiguration.hasChanged()) { |
518 if (newConfiguration.hasChanged()) { |
508 // Publish objects held by configuration object |
519 // Publish objects held by configuration object |
509 VarHandle.releaseFence(); |
520 VarHandle.releaseFence(); |
510 configuration = newConfiguration; |
521 configuration = newConfiguration; |
511 return true; |
522 return true; |
525 public final void start(long startNanos) { |
536 public final void start(long startNanos) { |
526 startInternal(startNanos); |
537 startInternal(startNanos); |
527 run(); |
538 run(); |
528 } |
539 } |
529 |
540 |
530 private void startInternal(long startNanos) { |
541 private synchronized void startInternal(long startNanos) { |
531 synchronized (this) { |
542 if (configuration.isStarted()) { |
532 if (configuration.isStarted()) { |
543 throw new IllegalStateException("Event stream can only be started once"); |
533 throw new IllegalStateException("Event stream can only be started once"); |
544 } |
534 } |
545 StreamConfiguration c = new StreamConfiguration(configuration); |
535 StreamConfiguration c = new StreamConfiguration(configuration); |
546 if (active) { |
536 if (active) { |
547 c.setStartNanos(startNanos); |
537 c.setStartNanos(startNanos); |
548 } |
538 } |
549 c.setStarted(true); |
539 c.setStarted(true); |
550 updateConfiguration(c); |
540 updateConfiguration(c); |
|
541 } |
|
542 } |
551 } |
543 |
552 |
544 public final void awaitTermination(Duration timeout) { |
553 public final void awaitTermination(Duration timeout) { |
545 Objects.requireNonNull(timeout); |
554 Objects.requireNonNull(timeout); |
546 if (thread != Thread.currentThread()) { |
555 if (thread != Thread.currentThread()) { |