31 import java.security.AccessController; |
31 import java.security.AccessController; |
32 import java.security.PrivilegedAction; |
32 import java.security.PrivilegedAction; |
33 import java.time.Duration; |
33 import java.time.Duration; |
34 import java.time.Instant; |
34 import java.time.Instant; |
35 import java.util.ArrayList; |
35 import java.util.ArrayList; |
36 import java.util.Arrays; |
|
37 import java.util.Comparator; |
36 import java.util.Comparator; |
38 import java.util.List; |
37 import java.util.List; |
39 import java.util.Objects; |
38 import java.util.Objects; |
|
39 import java.util.concurrent.atomic.AtomicLong; |
40 import java.util.function.Consumer; |
40 import java.util.function.Consumer; |
41 |
41 |
42 import jdk.jfr.EventType; |
42 import jdk.jfr.EventType; |
43 import jdk.jfr.internal.JVM; |
43 import jdk.jfr.internal.JVM; |
44 import jdk.jfr.internal.LogLevel; |
44 import jdk.jfr.internal.LogLevel; |
45 import jdk.jfr.internal.LogTag; |
45 import jdk.jfr.internal.LogTag; |
46 import jdk.jfr.internal.Logger; |
46 import jdk.jfr.internal.Logger; |
47 import jdk.jfr.internal.LongMap; |
|
48 import jdk.jfr.internal.SecuritySupport; |
47 import jdk.jfr.internal.SecuritySupport; |
49 import jdk.jfr.internal.Utils; |
|
50 import jdk.jfr.internal.consumer.InternalEventFilter; |
|
51 |
48 |
52 /* |
49 /* |
53 * Purpose of this class is to simplify the implementation of |
50 * Purpose of this class is to simplify the implementation of |
54 * an event stream. In particular, it handles: |
51 * an event stream. In particular, it handles: |
55 * |
52 * |
60 * - security |
57 * - security |
61 * |
58 * |
62 */ |
59 */ |
63 abstract class AbstractEventStream implements EventStream { |
60 abstract class AbstractEventStream implements EventStream { |
64 |
61 |
65 protected static final class StreamConfiguration { |
62 final static class EventDispatcher { |
66 private static final Runnable[] NO_ACTIONS = new Runnable[0]; |
|
67 |
|
68 private Consumer<?>[] errorActions = new Consumer<?>[0]; |
|
69 private Runnable[] flushActions = NO_ACTIONS; |
|
70 private Runnable[] closeActions = NO_ACTIONS; |
|
71 private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS; |
|
72 private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; |
|
73 private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>(); |
|
74 |
|
75 private boolean changedConfiguration = false; |
|
76 private boolean closed = false; |
|
77 private boolean reuse = true; |
|
78 private boolean ordered = true; |
|
79 private Instant startTime = null; |
|
80 private Instant endTime = null; |
|
81 private boolean started = false; |
|
82 private long startNanos = 0; |
|
83 private long endNanos = Long.MAX_VALUE; |
|
84 |
|
85 public StreamConfiguration(StreamConfiguration configuration) { |
|
86 this.flushActions = configuration.flushActions; |
|
87 this.closeActions = configuration.closeActions; |
|
88 this.errorActions = configuration.errorActions; |
|
89 this.dispatchers = configuration.dispatchers; |
|
90 this.eventFilter = configuration.eventFilter; |
|
91 this.closed = configuration.closed; |
|
92 this.reuse = configuration.reuse; |
|
93 this.ordered = configuration.ordered; |
|
94 this.startTime = configuration.startTime; |
|
95 this.endTime = configuration.endTime; |
|
96 this.started = configuration.started; |
|
97 this.startNanos = configuration.startNanos; |
|
98 this.endNanos = configuration.endNanos; |
|
99 this.dispatcherLookup = configuration.dispatcherLookup; |
|
100 } |
|
101 |
|
102 public StreamConfiguration() { |
|
103 } |
|
104 |
|
105 public StreamConfiguration remove(Object action) { |
|
106 flushActions = remove(flushActions, action); |
|
107 closeActions = remove(closeActions, action); |
|
108 dispatchers = removeDispatch(dispatchers, action); |
|
109 return this; |
|
110 } |
|
111 |
|
112 public StreamConfiguration addDispatcher(EventDispatcher e) { |
|
113 dispatchers = add(dispatchers, e); |
|
114 eventFilter = buildFilter(dispatchers); |
|
115 dispatcherLookup = new LongMap<>(); |
|
116 return this; |
|
117 } |
|
118 |
|
119 public StreamConfiguration addFlushAction(Runnable action) { |
|
120 flushActions = add(flushActions, action); |
|
121 return this; |
|
122 } |
|
123 |
|
124 public StreamConfiguration addCloseAction(Runnable action) { |
|
125 closeActions = add(closeActions, action); |
|
126 return this; |
|
127 } |
|
128 |
|
129 public StreamConfiguration addErrorAction(Consumer<Throwable> action) { |
|
130 errorActions = add(errorActions, action); |
|
131 return this; |
|
132 } |
|
133 |
|
134 public StreamConfiguration setClosed(boolean closed) { |
|
135 this.closed = closed; |
|
136 changedConfiguration = true; |
|
137 return this; |
|
138 } |
|
139 |
|
140 public boolean isClosed() { |
|
141 return closed; |
|
142 } |
|
143 |
|
144 public Runnable[] getCloseActions() { |
|
145 return closeActions; |
|
146 } |
|
147 |
|
148 public Runnable[] getFlushActions() { |
|
149 return flushActions; |
|
150 } |
|
151 |
|
152 private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) { |
|
153 List<EventDispatcher> list = new ArrayList<>(array.length); |
|
154 boolean modified = false; |
|
155 for (int i = 0; i < array.length; i++) { |
|
156 if (array[i].action != action) { |
|
157 list.add(array[i]); |
|
158 } else { |
|
159 modified = true; |
|
160 } |
|
161 } |
|
162 EventDispatcher[] result = list.toArray(new EventDispatcher[0]); |
|
163 if (modified) { |
|
164 eventFilter = buildFilter(result); |
|
165 dispatcherLookup = new LongMap<>(); |
|
166 changedConfiguration = true; |
|
167 } |
|
168 return result; |
|
169 } |
|
170 |
|
171 private <T> T[] remove(T[] array, Object action) { |
|
172 List<T> list = new ArrayList<>(array.length); |
|
173 for (int i = 0; i < array.length; i++) { |
|
174 if (array[i] != action) { |
|
175 list.add(array[i]); |
|
176 } else { |
|
177 changedConfiguration = true; |
|
178 } |
|
179 } |
|
180 return list.toArray(array); |
|
181 } |
|
182 |
|
183 private <T> T[] add(T[] array, T object) { |
|
184 List<T> list = new ArrayList<>(Arrays.asList(array)); |
|
185 list.add(object); |
|
186 changedConfiguration = true; |
|
187 return list.toArray(array); |
|
188 } |
|
189 |
|
190 private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { |
|
191 InternalEventFilter ef = new InternalEventFilter(); |
|
192 for (EventDispatcher ed : dispatchers) { |
|
193 String name = ed.eventName; |
|
194 if (name == null) { |
|
195 return InternalEventFilter.ACCEPT_ALL; |
|
196 } |
|
197 ef.setThreshold(name, 0); |
|
198 } |
|
199 return ef; |
|
200 } |
|
201 |
|
202 public StreamConfiguration setReuse(boolean reuse) { |
|
203 this.reuse = reuse; |
|
204 changedConfiguration = true; |
|
205 return this; |
|
206 } |
|
207 |
|
208 public StreamConfiguration setOrdered(boolean ordered) { |
|
209 this.ordered = ordered; |
|
210 changedConfiguration = true; |
|
211 return this; |
|
212 } |
|
213 |
|
214 public StreamConfiguration setEndTime(Instant endTime) { |
|
215 this.endTime = endTime; |
|
216 this.endNanos = Utils.timeToNanos(endTime); |
|
217 changedConfiguration = true; |
|
218 return this; |
|
219 } |
|
220 |
|
221 public StreamConfiguration setStartTime(Instant startTime) { |
|
222 this.startTime = startTime; |
|
223 this.startNanos = Utils.timeToNanos(startTime); |
|
224 changedConfiguration = true; |
|
225 return this; |
|
226 } |
|
227 |
|
228 public Instant getStartTime() { |
|
229 return startTime; |
|
230 } |
|
231 |
|
232 public Object getEndTime() { |
|
233 return endTime; |
|
234 } |
|
235 |
|
236 public boolean isStarted() { |
|
237 return started; |
|
238 } |
|
239 |
|
240 public StreamConfiguration setStartNanos(long startNanos) { |
|
241 this.startNanos = startNanos; |
|
242 changedConfiguration = true; |
|
243 return this; |
|
244 } |
|
245 |
|
246 public void setStarted(boolean started) { |
|
247 this.started = started; |
|
248 changedConfiguration = true; |
|
249 } |
|
250 |
|
251 public boolean hasChanged() { |
|
252 return changedConfiguration; |
|
253 } |
|
254 |
|
255 public boolean getReuse() { |
|
256 return reuse; |
|
257 } |
|
258 |
|
259 public boolean getOrdered() { |
|
260 return ordered; |
|
261 } |
|
262 |
|
263 public InternalEventFilter getFiler() { |
|
264 return eventFilter; |
|
265 } |
|
266 |
|
267 public long getStartNanos() { |
|
268 return startNanos; |
|
269 } |
|
270 |
|
271 public long getEndNanos() { |
|
272 return endNanos; |
|
273 } |
|
274 |
|
275 public InternalEventFilter getFilter() { |
|
276 return eventFilter; |
|
277 } |
|
278 |
|
279 public String toString() { |
|
280 StringBuilder sb = new StringBuilder(); |
|
281 for (Runnable flush : flushActions) { |
|
282 sb.append("Flush Action: ").append(flush).append("\n"); |
|
283 } |
|
284 for (Runnable close : closeActions) { |
|
285 sb.append("Close Action: " + close + "\n"); |
|
286 } |
|
287 for (Consumer<?> error : errorActions) { |
|
288 sb.append("Error Action: " + error + "\n"); |
|
289 } |
|
290 for (EventDispatcher dispatcher : dispatchers) { |
|
291 sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n"); |
|
292 } |
|
293 sb.append("Closed: ").append(closed).append("\n"); |
|
294 sb.append("Reuse: ").append(reuse).append("\n"); |
|
295 sb.append("Ordered: ").append(ordered).append("\n"); |
|
296 sb.append("Started: ").append(started).append("\n"); |
|
297 sb.append("Start Time: ").append(startTime).append("\n"); |
|
298 sb.append("Start Nanos: ").append(startNanos).append("\n"); |
|
299 sb.append("End Time: ").append(endTime).append("\n"); |
|
300 sb.append("End Nanos: ").append(endNanos).append("\n"); |
|
301 return sb.toString(); |
|
302 } |
|
303 |
|
304 private EventDispatcher[] getDispatchers() { |
|
305 return dispatchers; |
|
306 } |
|
307 } |
|
308 |
|
309 private final static class EventDispatcher { |
|
310 final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; |
63 final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; |
311 final private String eventName; |
64 final String eventName; |
312 final private Consumer<RecordedEvent> action; |
65 final Consumer<RecordedEvent> action; |
313 |
66 |
314 public EventDispatcher(Consumer<RecordedEvent> action) { |
67 public EventDispatcher(Consumer<RecordedEvent> action) { |
315 this(null, action); |
68 this(null, action); |
316 } |
69 } |
317 |
70 |
328 return (eventName == null || eventType.getName().equals(eventName)); |
81 return (eventName == null || eventType.getName().equals(eventName)); |
329 } |
82 } |
330 } |
83 } |
331 |
84 |
332 final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); |
85 final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); |
333 |
86 private final static AtomicLong counter = new AtomicLong(1); |
334 private final Thread thread; |
87 private volatile Thread thread; |
|
88 private final Object terminated = new Object(); |
335 private final boolean active; |
89 private final boolean active; |
336 private final Runnable flushOperation = () -> runFlushActions(); |
90 private final Runnable flushOperation = () -> runFlushActions(); |
337 private final AccessControlContext accessControllerContext; |
91 private final AccessControlContext accessControllerContext; |
338 private final Object configurationLock = new Object(); |
92 private final Object configurationLock = new Object(); |
339 |
93 |
340 // Modified by updateConfiguration() |
94 // Modified by updateConfiguration() |
341 protected volatile StreamConfiguration configuration = new StreamConfiguration(); |
95 protected volatile StreamConfiguration configuration = new StreamConfiguration(); |
342 |
96 |
343 // Cache the last event type and dispatch. |
|
344 private EventType lastEventType; |
|
345 private EventDispatcher[] lastEventDispatch; |
|
346 |
|
347 public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { |
97 public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { |
348 this.accessControllerContext = Objects.requireNonNull(acc); |
98 this.accessControllerContext = Objects.requireNonNull(acc); |
349 this.active = active; |
99 this.active = active; |
350 this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc)); |
|
351 } |
100 } |
352 |
101 |
353 @Override |
102 @Override |
354 abstract public void start(); |
103 abstract public void start(); |
355 |
104 |
453 return updateConfiguration(new StreamConfiguration(configuration).remove(action)); |
202 return updateConfiguration(new StreamConfiguration(configuration).remove(action)); |
454 } |
203 } |
455 } |
204 } |
456 |
205 |
457 @Override |
206 @Override |
458 public final void awaitTermination() { |
207 public final void awaitTermination() throws InterruptedException { |
459 awaitTermination(Duration.ofMillis(0)); |
208 awaitTermination(Duration.ofMillis(0)); |
460 } |
209 } |
461 |
210 |
462 @Override |
211 @Override |
463 public final void awaitTermination(Duration timeout) { |
212 public final void awaitTermination(Duration timeout) throws InterruptedException { |
464 Objects.requireNonNull(timeout); |
213 Objects.requireNonNull(timeout); |
465 if (thread != Thread.currentThread()) { |
214 if (timeout.isNegative()) { |
466 try { |
215 throw new IllegalArgumentException("timeout value is negative"); |
467 thread.join(timeout.toMillis()); |
216 } |
468 } catch (InterruptedException e) { |
217 |
469 // ignore |
218 long base = System.currentTimeMillis(); |
|
219 long now = 0; |
|
220 |
|
221 long millis; |
|
222 try { |
|
223 millis = Math.multiplyExact(timeout.getSeconds(), 1000); |
|
224 } catch (ArithmeticException a) { |
|
225 millis = Long.MAX_VALUE; |
|
226 } |
|
227 int nanos = timeout.toNanosPart(); |
|
228 if (nanos == 0 && millis == 0) { |
|
229 synchronized (terminated) { |
|
230 while (!isClosed()) { |
|
231 terminated.wait(0); |
|
232 } |
|
233 } |
|
234 } else { |
|
235 while (!isClosed()) { |
|
236 long delay = millis - now; |
|
237 if (delay <= 0) { |
|
238 break; |
|
239 } |
|
240 synchronized (terminated) { |
|
241 terminated.wait(delay, nanos); |
|
242 } |
|
243 now = System.currentTimeMillis() - base; |
470 } |
244 } |
471 } |
245 } |
472 } |
246 } |
473 |
247 |
474 protected abstract void process() throws Exception; |
248 protected abstract void process() throws Exception; |
475 |
249 |
476 protected final void clearLastDispatch() { |
250 protected final void dispatch(StreamConfiguration c, RecordedEvent event) { |
477 lastEventDispatch = null; |
|
478 lastEventType = null; |
|
479 } |
|
480 |
|
481 protected final void dispatch(RecordedEvent event) { |
|
482 EventType type = event.getEventType(); |
251 EventType type = event.getEventType(); |
483 EventDispatcher[] dispatchers = null; |
252 EventDispatcher[] dispatchers = null; |
484 if (type == lastEventType) { |
253 if (type == c.cacheEventType) { |
485 dispatchers = lastEventDispatch; |
254 dispatchers = c.cacheDispatchers; |
486 } else { |
255 } else { |
487 dispatchers = configuration.dispatcherLookup.get(type.getId()); |
256 dispatchers = c.dispatcherLookup.get(type.getId()); |
488 if (dispatchers == null) { |
257 if (dispatchers == null) { |
489 List<EventDispatcher> list = new ArrayList<>(); |
258 List<EventDispatcher> list = new ArrayList<>(); |
490 for (EventDispatcher e : configuration.getDispatchers()) { |
259 for (EventDispatcher e : c.getDispatchers()) { |
491 if (e.accepts(type)) { |
260 if (e.accepts(type)) { |
492 list.add(e); |
261 list.add(e); |
493 } |
262 } |
494 } |
263 } |
495 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); |
264 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); |
496 configuration.dispatcherLookup.put(type.getId(), dispatchers); |
265 c.dispatcherLookup.put(type.getId(), dispatchers); |
497 } |
266 } |
498 lastEventDispatch = dispatchers; |
267 c.cacheDispatchers = dispatchers; |
499 } |
268 } |
500 for (int i = 0; i < dispatchers.length; i++) { |
269 for (int i = 0; i < dispatchers.length; i++) { |
501 try { |
270 try { |
502 dispatchers[i].offer(event); |
271 dispatchers[i].offer(event); |
503 } catch (Exception e) { |
272 } catch (Exception e) { |