src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
branchJEP-349-branch
changeset 57372 50ca040843ea
parent 57361 53dccc90a5be
child 57374 41f0051285e0
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri May 24 19:39:31 2019 +0200
@@ -25,85 +25,111 @@
 
 package jdk.jfr.consumer;
 
+import java.io.IOException;
 import java.nio.file.Path;
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
+
 /**
  * Implementation of an event stream that operates against a recording file.
  *
  */
 final class EventFileStream implements EventStream {
 
-    public EventFileStream(Path path) {
+    final static class FileEventConsumer extends EventConsumer {
+        private final RecordingInput input;
+
+        public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+            super(acc);
+            this.input = input;
+        }
+
+        @Override
+        public void process() throws Exception {
+            // TODO This need more work; filter, multiple chunk etc
+            ChunkParser cp = new ChunkParser(input);
+            while (true) {
+                RecordedEvent e = cp.readEvent();
+                dispatch(e);
+            }
+        }
+    }
+
+    private final RecordingInput input;
+    private final FileEventConsumer eventConsumer;
+
+    public EventFileStream(Path path) throws IOException {
         Objects.requireNonNull(path);
+        input = new RecordingInput(path.toFile());
+        eventConsumer = new FileEventConsumer(AccessController.getContext(), input);
     }
 
     @Override
     public void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        notImplemented();
-    }
-
-    public void onEvent(EventFilter filter, Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(filter);
-        Objects.requireNonNull(action);
-        notImplemented();
+        eventConsumer.onEvent(action);
     }
 
     @Override
     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        notImplemented();
+        eventConsumer.onEvent(eventName, action);
     }
 
     @Override
     public void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        notImplemented();
+        eventConsumer.onFlush(action);
     }
 
     @Override
     public void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        notImplemented();
+        eventConsumer.addCloseAction(action);
     }
 
     @Override
     public void close() {
-        notImplemented();
+        eventConsumer.setClosed(true);
+        eventConsumer.runCloseActions();
+        try {
+            input.close();
+        } catch (IOException e) {
+            // ignore
+        }
     }
 
     @Override
     public boolean remove(Object action) {
         Objects.requireNonNull(action);
-        notImplemented();
-        return false;
+        return eventConsumer.remove(action);
     }
 
     @Override
     public void start() {
-        notImplemented();
+        eventConsumer.start(0);
     }
 
     @Override
     public void startAsync() {
-        notImplemented();
+        eventConsumer.startAsync(0);
     }
 
     @Override
     public void awaitTermination(Duration timeout) {
         Objects.requireNonNull(timeout);
+        eventConsumer.awaitTermination(timeout);
     }
 
     @Override
     public void awaitTermination() {
-        notImplemented();
-    }
-
-    private static void notImplemented() {
-        throw new UnsupportedOperationException("Streaming for files not yet implemenetd");
+        eventConsumer.awaitTermination();
     }
 }