--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -33,10 +33,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import jdk.jfr.EventType;
@@ -44,10 +44,7 @@
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.LongMap;
import jdk.jfr.internal.SecuritySupport;
-import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
/*
* Purpose of this class is to simplify the implementation of
@@ -62,254 +59,10 @@
*/
abstract class AbstractEventStream implements EventStream {
- protected static final class StreamConfiguration {
- private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
- private Consumer<?>[] errorActions = new Consumer<?>[0];
- private Runnable[] flushActions = NO_ACTIONS;
- private Runnable[] closeActions = NO_ACTIONS;
- private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
- private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
- private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
- private boolean changedConfiguration = false;
- private boolean closed = false;
- private boolean reuse = true;
- private boolean ordered = true;
- private Instant startTime = null;
- private Instant endTime = null;
- private boolean started = false;
- private long startNanos = 0;
- private long endNanos = Long.MAX_VALUE;
-
- public StreamConfiguration(StreamConfiguration configuration) {
- this.flushActions = configuration.flushActions;
- this.closeActions = configuration.closeActions;
- this.errorActions = configuration.errorActions;
- this.dispatchers = configuration.dispatchers;
- this.eventFilter = configuration.eventFilter;
- this.closed = configuration.closed;
- this.reuse = configuration.reuse;
- this.ordered = configuration.ordered;
- this.startTime = configuration.startTime;
- this.endTime = configuration.endTime;
- this.started = configuration.started;
- this.startNanos = configuration.startNanos;
- this.endNanos = configuration.endNanos;
- this.dispatcherLookup = configuration.dispatcherLookup;
- }
-
- public StreamConfiguration() {
- }
-
- public StreamConfiguration remove(Object action) {
- flushActions = remove(flushActions, action);
- closeActions = remove(closeActions, action);
- dispatchers = removeDispatch(dispatchers, action);
- return this;
- }
-
- public StreamConfiguration addDispatcher(EventDispatcher e) {
- dispatchers = add(dispatchers, e);
- eventFilter = buildFilter(dispatchers);
- dispatcherLookup = new LongMap<>();
- return this;
- }
-
- public StreamConfiguration addFlushAction(Runnable action) {
- flushActions = add(flushActions, action);
- return this;
- }
-
- public StreamConfiguration addCloseAction(Runnable action) {
- closeActions = add(closeActions, action);
- return this;
- }
-
- public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
- errorActions = add(errorActions, action);
- return this;
- }
-
- public StreamConfiguration setClosed(boolean closed) {
- this.closed = closed;
- changedConfiguration = true;
- return this;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public Runnable[] getCloseActions() {
- return closeActions;
- }
-
- public Runnable[] getFlushActions() {
- return flushActions;
- }
-
- private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
- List<EventDispatcher> list = new ArrayList<>(array.length);
- boolean modified = false;
- for (int i = 0; i < array.length; i++) {
- if (array[i].action != action) {
- list.add(array[i]);
- } else {
- modified = true;
- }
- }
- EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
- if (modified) {
- eventFilter = buildFilter(result);
- dispatcherLookup = new LongMap<>();
- changedConfiguration = true;
- }
- return result;
- }
-
- private <T> T[] remove(T[] array, Object action) {
- List<T> list = new ArrayList<>(array.length);
- for (int i = 0; i < array.length; i++) {
- if (array[i] != action) {
- list.add(array[i]);
- } else {
- changedConfiguration = true;
- }
- }
- return list.toArray(array);
- }
-
- private <T> T[] add(T[] array, T object) {
- List<T> list = new ArrayList<>(Arrays.asList(array));
- list.add(object);
- changedConfiguration = true;
- return list.toArray(array);
- }
-
- private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
- InternalEventFilter ef = new InternalEventFilter();
- for (EventDispatcher ed : dispatchers) {
- String name = ed.eventName;
- if (name == null) {
- return InternalEventFilter.ACCEPT_ALL;
- }
- ef.setThreshold(name, 0);
- }
- return ef;
- }
-
- public StreamConfiguration setReuse(boolean reuse) {
- this.reuse = reuse;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setOrdered(boolean ordered) {
- this.ordered = ordered;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setEndTime(Instant endTime) {
- this.endTime = endTime;
- this.endNanos = Utils.timeToNanos(endTime);
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setStartTime(Instant startTime) {
- this.startTime = startTime;
- this.startNanos = Utils.timeToNanos(startTime);
- changedConfiguration = true;
- return this;
- }
-
- public Instant getStartTime() {
- return startTime;
- }
-
- public Object getEndTime() {
- return endTime;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public StreamConfiguration setStartNanos(long startNanos) {
- this.startNanos = startNanos;
- changedConfiguration = true;
- return this;
- }
-
- public void setStarted(boolean started) {
- this.started = started;
- changedConfiguration = true;
- }
-
- public boolean hasChanged() {
- return changedConfiguration;
- }
-
- public boolean getReuse() {
- return reuse;
- }
-
- public boolean getOrdered() {
- return ordered;
- }
-
- public InternalEventFilter getFiler() {
- return eventFilter;
- }
-
- public long getStartNanos() {
- return startNanos;
- }
-
- public long getEndNanos() {
- return endNanos;
- }
-
- public InternalEventFilter getFilter() {
- return eventFilter;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Runnable flush : flushActions) {
- sb.append("Flush Action: ").append(flush).append("\n");
- }
- for (Runnable close : closeActions) {
- sb.append("Close Action: " + close + "\n");
- }
- for (Consumer<?> error : errorActions) {
- sb.append("Error Action: " + error + "\n");
- }
- for (EventDispatcher dispatcher : dispatchers) {
- sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
- }
- sb.append("Closed: ").append(closed).append("\n");
- sb.append("Reuse: ").append(reuse).append("\n");
- sb.append("Ordered: ").append(ordered).append("\n");
- sb.append("Started: ").append(started).append("\n");
- sb.append("Start Time: ").append(startTime).append("\n");
- sb.append("Start Nanos: ").append(startNanos).append("\n");
- sb.append("End Time: ").append(endTime).append("\n");
- sb.append("End Nanos: ").append(endNanos).append("\n");
- return sb.toString();
- }
-
- private EventDispatcher[] getDispatchers() {
- return dispatchers;
- }
- }
-
- private final static class EventDispatcher {
+ final static class EventDispatcher {
final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
- final private String eventName;
- final private Consumer<RecordedEvent> action;
+ final String eventName;
+ final Consumer<RecordedEvent> action;
public EventDispatcher(Consumer<RecordedEvent> action) {
this(null, action);
@@ -330,8 +83,9 @@
}
final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
-
- private final Thread thread;
+ private final static AtomicLong counter = new AtomicLong(1);
+ private volatile Thread thread;
+ private final Object terminated = new Object();
private final boolean active;
private final Runnable flushOperation = () -> runFlushActions();
private final AccessControlContext accessControllerContext;
@@ -340,14 +94,9 @@
// Modified by updateConfiguration()
protected volatile StreamConfiguration configuration = new StreamConfiguration();
- // Cache the last event type and dispatch.
- private EventType lastEventType;
- private EventDispatcher[] lastEventDispatch;
-
public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
this.active = active;
- this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
}
@Override
@@ -360,7 +109,7 @@
abstract public void close();
// Purpose of synchronizing the following methods is
- // to serialize changes to the configuration, so only one
+ // to serialize changes to the configuration so only one
// thread at a time can change the configuration.
//
// The purpose is not to guard the configuration field. A new
@@ -455,47 +204,67 @@
}
@Override
- public final void awaitTermination() {
+ public final void awaitTermination() throws InterruptedException {
awaitTermination(Duration.ofMillis(0));
}
@Override
- public final void awaitTermination(Duration timeout) {
+ public final void awaitTermination(Duration timeout) throws InterruptedException {
Objects.requireNonNull(timeout);
- if (thread != Thread.currentThread()) {
- try {
- thread.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("timeout value is negative");
+ }
+
+ long base = System.currentTimeMillis();
+ long now = 0;
+
+ long millis;
+ try {
+ millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+ } catch (ArithmeticException a) {
+ millis = Long.MAX_VALUE;
+ }
+ int nanos = timeout.toNanosPart();
+ if (nanos == 0 && millis == 0) {
+ synchronized (terminated) {
+ while (!isClosed()) {
+ terminated.wait(0);
+ }
+ }
+ } else {
+ while (!isClosed()) {
+ long delay = millis - now;
+ if (delay <= 0) {
+ break;
+ }
+ synchronized (terminated) {
+ terminated.wait(delay, nanos);
+ }
+ now = System.currentTimeMillis() - base;
}
}
}
protected abstract void process() throws Exception;
- protected final void clearLastDispatch() {
- lastEventDispatch = null;
- lastEventType = null;
- }
-
- protected final void dispatch(RecordedEvent event) {
+ protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
EventType type = event.getEventType();
EventDispatcher[] dispatchers = null;
- if (type == lastEventType) {
- dispatchers = lastEventDispatch;
+ if (type == c.cacheEventType) {
+ dispatchers = c.cacheDispatchers;
} else {
- dispatchers = configuration.dispatcherLookup.get(type.getId());
+ dispatchers = c.dispatcherLookup.get(type.getId());
if (dispatchers == null) {
List<EventDispatcher> list = new ArrayList<>();
- for (EventDispatcher e : configuration.getDispatchers()) {
+ for (EventDispatcher e : c.getDispatchers()) {
if (e.accepts(type)) {
list.add(e);
}
}
dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
- configuration.dispatcherLookup.put(type.getId(), dispatchers);
+ c.dispatcherLookup.put(type.getId(), dispatchers);
}
- lastEventDispatch = dispatchers;
+ c.cacheDispatchers = dispatchers;
}
for (int i = 0; i < dispatchers.length; i++) {
try {
@@ -529,11 +298,14 @@
protected final void startAsync(long startNanos) {
startInternal(startNanos);
+ Runnable r = () -> run(accessControllerContext);
+ thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
thread.start();
}
protected final void start(long startNanos) {
startInternal(startNanos);
+ thread = Thread.currentThread();
run(accessControllerContext);
}
@@ -580,6 +352,13 @@
defaultErrorHandler(e);
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+ try {
+ close();
+ } finally {
+ synchronized (terminated) {
+ terminated.notifyAll();
+ }
+ }
}
}
@@ -591,15 +370,11 @@
}
for (int i = 0; i < consumers.length; i++) {
@SuppressWarnings("unchecked")
- Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
- c.accept(e);
+ Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+ conusmer.accept(e);
}
}
- private void defaultErrorHandler(Throwable e) {
- e.printStackTrace();
- }
-
private void runFlushActions() {
Runnable[] flushActions = configuration.getFlushActions();
for (int i = 0; i < flushActions.length; i++) {
@@ -611,13 +386,22 @@
}
}
- private void run(AccessControlContext acc) {
+ private void run(AccessControlContext accessControlContext) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
execute();
return null;
}
- }, acc);
+ }, accessControlContext);
+ }
+
+ private String nextThreadName() {
+ counter.incrementAndGet();
+ return "JFR Event Stream " + counter;
+ }
+
+ private void defaultErrorHandler(Throwable e) {
+ e.printStackTrace();
}
}
\ No newline at end of file