--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 24 19:39:31 2019 +0200
@@ -32,141 +32,144 @@
import java.util.Objects;
import java.util.function.Consumer;
-import jdk.jfr.EventType;
+import jdk.jfr.internal.consumer.EventConsumer;
final class EventDirectoryStream implements EventStream {
- public final static class EventConsumer {
- final private String eventName;
- final Consumer<RecordedEvent> action;
+ private static class EventRunner extends EventConsumer {
+ private EventSetLocation location;
+ private EventSet eventSet;
+ private int eventSetIndex;
+ private int eventArrayIndex;
+ private RecordedEvent[] currentEventArray = new RecordedEvent[0];
- EventConsumer(String eventName, Consumer<RecordedEvent> eventConsumer) {
- this.eventName = eventName;
- this.action = eventConsumer;
+ public EventRunner(AccessControlContext acc) throws IOException {
+ super(acc);
}
- public void offer(RecordedEvent event) {
- action.accept(event);
+ public void process() throws Exception, IOException {
+ this.location = EventSetLocation.current();
+ this.eventSet = location.acquire(startNanos, null); // use timestamp
+ // from
+ if (eventSet == null) {
+ return;
+ }
+ while (!isClosed()) {
+ processSegment();
+ runFlushActions();
+ do {
+ if (isClosed()) {
+ return;
+ }
+ currentEventArray = eventSet.readEvents(eventSetIndex);
+ if (currentEventArray == EventSet.END_OF_SET) {
+ eventSet = eventSet.next(eventFilter);
+ if (eventSet == null || isClosed()) {
+ return;
+ }
+ eventSetIndex = 0;
+ continue;
+ }
+ if (currentEventArray == null) {
+ return; // no more events
+ }
+ eventSetIndex++;
+ } while (currentEventArray.length == 0);
+ eventArrayIndex = 0;
+ }
}
- public boolean accepts(EventType eventType) {
- return (eventName == null || eventType.getName().equals(eventName));
+ private void processSegment() {
+ while (eventArrayIndex < currentEventArray.length) {
+ RecordedEvent e = currentEventArray[eventArrayIndex++];
+ if (e == null) {
+ return;
+ }
+ dispatch(e);
+ }
+ }
+
+ public void close() {
+ setClosed(true);
+ // TODO: Data races here, must fix
+ synchronized (this) {
+ if (eventSet != null) {
+ eventSet.release(null);
+ }
+ if (location != null) {
+ location.release();
+ }
+ }
+ runCloseActions();
}
}
- private final EventRunner eventRunner;
- private Thread thread;
- private boolean started;
+ private final EventRunner eventConsumer;
public EventDirectoryStream(AccessControlContext acc) throws IOException {
- eventRunner = new EventRunner(acc);
+ eventConsumer = new EventRunner(acc);
}
public void close() {
- synchronized (eventRunner) {
- eventRunner.close();
- }
+ eventConsumer.close();
}
- public synchronized void onFlush(Runnable action) {
+ public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- this.eventRunner.addFlush(action);
- }
+ eventConsumer.onFlush(action);
}
void start(long startNanos) {
- synchronized (eventRunner) {
- if (started) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- started = true;
- eventRunner.setStartNanos(startNanos);
- }
- eventRunner.run();
+ eventConsumer.start(startNanos);
}
@Override
public void start() {
- start(Instant.now().toEpochMilli() * 1000*1000L);
+ start(Instant.now().toEpochMilli() * 1000 * 1000L);
}
@Override
public void startAsync() {
- startAsync(Instant.now().toEpochMilli() * 1000*1000L);
+ startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
}
void startAsync(long startNanos) {
- synchronized (eventRunner) {
- eventRunner.setStartNanos(startNanos);
- thread = new Thread(eventRunner);
- thread.setDaemon(true);
- thread.start();
- }
+ eventConsumer.startAsync(startNanos);
}
- public void addEventConsumer(EventConsumer action) {
- Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(action);
- }
- }
-
-
-
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(new EventConsumer(null, action));
- }
+ eventConsumer.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(new EventConsumer(eventName, action));
- }
+ eventConsumer.onEvent(eventName, action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.addCloseAction(action);
- }
+ eventConsumer.addCloseAction(action);
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- return eventRunner.remove(action);
- }
+ return eventConsumer.remove(action);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- Thread t = null;
- synchronized (eventRunner) {
- t = thread;
- }
- if (t != null && t != Thread.currentThread()) {
- try {
- t.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
- }
- }
+ eventConsumer.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- awaitTermination(Duration.ofMillis(0));
+ eventConsumer.awaitTermination(Duration.ofMillis(0));
}
-
-
}