108 |
108 |
109 protected boolean ordered = true; |
109 protected boolean ordered = true; |
110 |
110 |
111 protected boolean reuse = true; |
111 protected boolean reuse = true; |
112 |
112 |
113 protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; |
113 protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; |
114 |
114 |
115 private final AccessControlContext accessControlContext; |
115 private final AccessControlContext accessControlContext; |
116 private boolean started; |
116 private boolean started; |
117 private Thread thread; |
117 private Thread thread; |
118 |
118 |
182 remove = true; |
182 remove = true; |
183 } |
183 } |
184 } |
184 } |
185 if (removeConsumer) { |
185 if (removeConsumer) { |
186 EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); |
186 EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); |
|
187 eventFilter = buildFilter(array); |
187 consumersHandle.setVolatile(this, array); |
188 consumersHandle.setVolatile(this, array); |
188 dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset |
189 dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset |
189 // dispatch |
190 // dispatch |
190 } |
191 } |
191 return remove; |
192 return remove; |
219 |
220 |
220 public void onEvent(String eventName, Consumer<RecordedEvent> action) { |
221 public void onEvent(String eventName, Consumer<RecordedEvent> action) { |
221 add(new EventDispatcher(eventName, action)); |
222 add(new EventDispatcher(eventName, action)); |
222 } |
223 } |
223 |
224 |
|
225 InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { |
|
226 InternalEventFilter ef = new InternalEventFilter(); |
|
227 for (EventDispatcher ed : dispatchers) { |
|
228 String name = ed.eventName; |
|
229 if (name == null) { |
|
230 return InternalEventFilter.ACCEPT_ALL; |
|
231 } |
|
232 ef.setThreshold(name, 0); |
|
233 } |
|
234 return ef.threadSafe(); |
|
235 } |
|
236 |
224 private synchronized void add(EventDispatcher e) { |
237 private synchronized void add(EventDispatcher e) { |
225 consumersHandle.setVolatile(this, merge(consumers, e)); |
238 EventDispatcher[] dispatchers = merge(consumers,e); |
|
239 eventFilter = buildFilter(dispatchers); |
|
240 consumersHandle.setVolatile(this, dispatchers); |
226 dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset |
241 dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset |
227 } |
242 } |
228 |
243 |
229 public synchronized void onFlush(Runnable action) { |
244 public synchronized void onFlush(Runnable action) { |
230 flushActionsHandle.setVolatile(this, addAction(flushActions, action)); |
245 flushActionsHandle.setVolatile(this, addAction(flushActions, action)); |