src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
branchJEP-349-branch
changeset 57604 838f9a7635b6
parent 57452 6fabe73e5d9a
child 57690 9316d02dd4a5
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Wed Jul 31 14:07:44 2019 +0200
@@ -43,39 +43,41 @@
  */
 final class EventFileStream implements EventStream {
 
-    private final static class FileConsumer extends EventConsumer {
+    private final static class FileStream extends AbstractEventStream {
         private static final int DEFAULT_ARRAY_SIZE = 100_000;
+
         private final RecordingInput input;
+
         private ChunkParser chunkParser;
-        private boolean reuse = true;
         private RecordedEvent[] sortedList;
-        private boolean ordered;
 
-        public FileConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+        public FileStream(AccessControlContext acc, Path path) throws IOException {
             super(acc);
-            this.input = input;
-        }
+            this.input = new RecordingInput(path.toFile());
+;        }
 
         @Override
         public void process() throws IOException {
-            chunkParser = new ChunkParser(input, reuse);
+            StreamConfiguration c1 = configuration;
+            chunkParser = new ChunkParser(input, c1.getReuse());
             while (!isClosed()) {
-                boolean reuse = this.reuse;
-                boolean ordered = this.ordered;
-                chunkParser.setReuse(reuse);
+                StreamConfiguration c2 = configuration;
+                boolean ordered = c2.getOrdered();
+                chunkParser.setReuse(c2.getReuse());
                 chunkParser.setOrdered(ordered);
                 chunkParser.resetEventCache();
-                chunkParser.setParserFilter(eventFilter);
+                chunkParser.setParserFilter(c2.getFiler());
                 chunkParser.updateEventParsers();
+                clearLastDispatch();
                 if (ordered) {
                     processOrdered();
                 } else {
                     processUnordered();
                 }
+                runFlushActions();
                 if (chunkParser.isLastChunk()) {
                     return;
                 }
-                runFlushActions();
                 chunkParser = chunkParser.nextChunkParser();
             }
         }
@@ -115,106 +117,94 @@
             }
         }
 
-        public void setReuse(boolean reuse) {
-            this.reuse = reuse;
-        }
-
-        public void setOrdered(boolean ordered) {
-            this.ordered = ordered;
-        }
-
         @Override
         public void close() {
-
+            setClosed(true);;
+            runCloseActions();
+            try {
+                input.close();
+            } catch (IOException e) {
+                // ignore
+            }
         }
-
-
     }
 
-    private final RecordingInput input;
-    private final FileConsumer eventConsumer;
+    private final FileStream eventStream;
 
     public EventFileStream(Path path, Instant from, Instant to) throws IOException {
         Objects.requireNonNull(path);
-        input = new RecordingInput(path.toFile());
-        eventConsumer = new FileConsumer(AccessController.getContext(), input);
+        eventStream = new FileStream(AccessController.getContext(), path);
     }
 
     @Override
     public void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        eventConsumer.onEvent(action);
+        eventStream.onEvent(action);
     }
 
     @Override
     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        eventConsumer.onEvent(eventName, action);
+        eventStream.onEvent(eventName, action);
     }
 
     @Override
     public void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        eventConsumer.onFlush(action);
+        eventStream.onFlush(action);
     }
 
     @Override
     public void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        eventConsumer.addCloseAction(action);
+        eventStream.addCloseAction(action);
     }
 
     @Override
     public void close() {
-        eventConsumer.setClosed(true);
-        eventConsumer.runCloseActions();
-        try {
-            input.close();
-        } catch (IOException e) {
-            // ignore
-        }
+        eventStream.close();
     }
 
     @Override
     public boolean remove(Object action) {
         Objects.requireNonNull(action);
-        return eventConsumer.remove(action);
+        return eventStream.remove(action);
     }
 
     @Override
     public void start() {
-        eventConsumer.start(0);
+        eventStream.start(0);
     }
 
     @Override
     public void setReuse(boolean reuse) {
-        eventConsumer.setReuse(reuse);
+        eventStream.setReuse(reuse);
     }
 
     @Override
     public void startAsync() {
-        eventConsumer.startAsync(0);
+        eventStream.startAsync(0);
     }
 
     @Override
     public void awaitTermination(Duration timeout) {
         Objects.requireNonNull(timeout);
-        eventConsumer.awaitTermination(timeout);
+        eventStream.awaitTermination(timeout);
     }
 
     @Override
     public void awaitTermination() {
-        eventConsumer.awaitTermination();
+        eventStream.awaitTermination();
     }
 
     @Override
     public void setOrdered(boolean ordered) {
-        eventConsumer.setOrdered(ordered);
+        eventStream.setOrdered(ordered);
     }
 
     @Override
     public void setStartTime(Instant startTime) {
-        eventConsumer.setStartTime(startTime);
+        eventStream.setStartTime(startTime);
     }
 }