src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
changeset 59226 a0f39cc47387
parent 58863 c16ac7a2eba4
child 59310 72f3dd43dd28
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Fri Nov 22 09:06:35 2019 -0500
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Fri Nov 22 17:20:43 2019 +0100
@@ -35,6 +35,7 @@
 
 import jdk.jfr.consumer.RecordedEvent;
 import jdk.jfr.internal.JVM;
+import jdk.jfr.internal.PlatformRecording;
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
 
@@ -43,12 +44,12 @@
  * with chunk files.
  *
  */
-public final class EventDirectoryStream extends AbstractEventStream {
+public class EventDirectoryStream extends AbstractEventStream {
 
     private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
 
     private final RepositoryFiles repositoryFiles;
-    private final boolean active;
+    private final PlatformRecording recording;
     private final FileAccess fileAccess;
 
     private ChunkParser currentParser;
@@ -56,10 +57,10 @@
     private RecordedEvent[] sortedCache;
     private int threadExclusionLevel = 0;
 
-    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
-        super(acc, active);
+    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException {
+        super(acc, recording);
         this.fileAccess = Objects.requireNonNull(fileAccess);
-        this.active = active;
+        this.recording = recording;
         this.repositoryFiles = new RepositoryFiles(fileAccess, p);
     }
 
@@ -104,7 +105,7 @@
         Dispatcher disp = dispatcher();
 
         Path path;
-        boolean validStartTime = active || disp.startTime != null;
+        boolean validStartTime = recording != null || disp.startTime != null;
         if (validStartTime) {
             path = repositoryFiles.firstPath(disp.startNanos);
         } else {
@@ -139,8 +140,17 @@
                         return;
                     }
                 }
+                if (isLastChunk()) {
+                    // Recording was stopped/closed externally, and no more data to process.
+                    return;
+                }
 
+                if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) {
+                    // JVM process exited/crashed, or repository migrated to an unknown location
+                    return;
+                }
                 if (isClosed()) {
+                    // Stream was closed
                     return;
                 }
                 long durationNanos = currentParser.getChunkDuration();
@@ -162,6 +172,13 @@
         }
     }
 
+    private boolean isLastChunk() {
+        if (recording == null) {
+            return false;
+        }
+        return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
+    }
+
     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
         if (sortedCache == null) {
             sortedCache = new RecordedEvent[100_000];
@@ -206,4 +223,5 @@
             }
         }
     }
+
 }