--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 24 19:39:31 2019 +0200
@@ -25,85 +25,111 @@
package jdk.jfr.consumer;
+import java.io.IOException;
import java.nio.file.Path;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
+import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
+
/**
* Implementation of an event stream that operates against a recording file.
*
*/
final class EventFileStream implements EventStream {
- public EventFileStream(Path path) {
+ final static class FileEventConsumer extends EventConsumer {
+ private final RecordingInput input;
+
+ public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+ super(acc);
+ this.input = input;
+ }
+
+ @Override
+ public void process() throws Exception {
+ // TODO This need more work; filter, multiple chunk etc
+ ChunkParser cp = new ChunkParser(input);
+ while (true) {
+ RecordedEvent e = cp.readEvent();
+ dispatch(e);
+ }
+ }
+ }
+
+ private final RecordingInput input;
+ private final FileEventConsumer eventConsumer;
+
+ public EventFileStream(Path path) throws IOException {
Objects.requireNonNull(path);
+ input = new RecordingInput(path.toFile());
+ eventConsumer = new FileEventConsumer(AccessController.getContext(), input);
}
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- notImplemented();
- }
-
- public void onEvent(EventFilter filter, Consumer<RecordedEvent> action) {
- Objects.requireNonNull(filter);
- Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onEvent(eventName, action);
}
@Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onFlush(action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.addCloseAction(action);
}
@Override
public void close() {
- notImplemented();
+ eventConsumer.setClosed(true);
+ eventConsumer.runCloseActions();
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- notImplemented();
- return false;
+ return eventConsumer.remove(action);
}
@Override
public void start() {
- notImplemented();
+ eventConsumer.start(0);
}
@Override
public void startAsync() {
- notImplemented();
+ eventConsumer.startAsync(0);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
+ eventConsumer.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- notImplemented();
- }
-
- private static void notImplemented() {
- throw new UnsupportedOperationException("Streaming for files not yet implemenetd");
+ eventConsumer.awaitTermination();
}
}