src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57449 099789ceff7d
parent 57433 83e4343a6984
child 57452 6fabe73e5d9a
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Jun 27 10:41:01 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Wed Jul 03 22:51:44 2019 +0200
@@ -35,7 +35,6 @@
 import java.util.Objects;
 import java.util.function.Consumer;
 
-import jdk.jfr.internal.consumer.EventConsumer;
 import jdk.jfr.internal.consumer.RecordingInput;
 import jdk.jfr.internal.consumer.RepositoryFiles;
 
@@ -46,23 +45,32 @@
  */
 final class EventDirectoryStream implements EventStream {
 
-    static final class ParserConsumer extends EventConsumer {
+    static final class DirectoryConsumer extends EventConsumer {
 
         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
         private static final int DEFAULT_ARRAY_SIZE = 10_000;
         private final RepositoryFiles repositoryFiles;
         private ChunkParser chunkParser;
         private RecordedEvent[] sortedList;
+        protected long chunkStartNanos;
 
-        public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
+        public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
             super(acc);
             repositoryFiles = new RepositoryFiles(p);
         }
 
         @Override
         public void process() throws IOException {
-            Path path = repositoryFiles.nextPath(startNanos);
-            startNanos = repositoryFiles.getTimestamp(path) + 1;
+            chunkStartNanos = startNanos;
+            Path path;
+            if (startTime == EventConsumer.NEXT_EVENT) {
+                // TODO: Need to skip forward to the next event
+                // For now, use the last chunk.
+                path = repositoryFiles.lastPath();
+            } else {
+                path = repositoryFiles.nextPath(chunkStartNanos);
+            }
+            chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
             try (RecordingInput input = new RecordingInput(path.toFile())) {
                 chunkParser = new ChunkParser(input, this.reuse);
                 while (!isClosed()) {
@@ -81,11 +89,11 @@
                         runFlushActions();
                     }
 
-                    path = repositoryFiles.nextPath(startNanos);
+                    path = repositoryFiles.nextPath(chunkStartNanos);
                     if (path == null) {
                         return; // stream closed
                     }
-                    startNanos = repositoryFiles.getTimestamp(path) + 1;
+                    chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
                     input.setFile(path);
                     chunkParser = chunkParser.newChunkParser();
                 }
@@ -147,8 +155,9 @@
 
     private final EventConsumer eventConsumer;
 
-    public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
-        eventConsumer = new ParserConsumer(acc, p);
+    public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
+        eventConsumer = new DirectoryConsumer(acc, p);
+        eventConsumer.startTime = startTime;
     }
 
     public void close() {
@@ -223,4 +232,9 @@
     public void setOrdered(boolean ordered) {
         eventConsumer.setOrdered(ordered);
     }
+
+    @Override
+    public void setStartTime(Instant startTime) {
+        eventConsumer.setStartTime(startTime);
+    }
 }