--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 22 09:06:35 2019 -0500
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 22 17:20:43 2019 +0100
@@ -35,6 +35,7 @@
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.JVM;
+import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
@@ -43,12 +44,12 @@
* with chunk files.
*
*/
-public final class EventDirectoryStream extends AbstractEventStream {
+public class EventDirectoryStream extends AbstractEventStream {
private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
private final RepositoryFiles repositoryFiles;
- private final boolean active;
+ private final PlatformRecording recording;
private final FileAccess fileAccess;
private ChunkParser currentParser;
@@ -56,10 +57,10 @@
private RecordedEvent[] sortedCache;
private int threadExclusionLevel = 0;
- public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
- super(acc, active);
+ public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException {
+ super(acc, recording);
this.fileAccess = Objects.requireNonNull(fileAccess);
- this.active = active;
+ this.recording = recording;
this.repositoryFiles = new RepositoryFiles(fileAccess, p);
}
@@ -104,7 +105,7 @@
Dispatcher disp = dispatcher();
Path path;
- boolean validStartTime = active || disp.startTime != null;
+ boolean validStartTime = recording != null || disp.startTime != null;
if (validStartTime) {
path = repositoryFiles.firstPath(disp.startNanos);
} else {
@@ -139,8 +140,17 @@
return;
}
}
+ if (isLastChunk()) {
+ // Recording was stopped/closed externally, and no more data to process.
+ return;
+ }
+ if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) {
+ // JVM process exited/crashed, or repository migrated to an unknown location
+ return;
+ }
if (isClosed()) {
+ // Stream was closed
return;
}
long durationNanos = currentParser.getChunkDuration();
@@ -162,6 +172,13 @@
}
}
+ private boolean isLastChunk() {
+ if (recording == null) {
+ return false;
+ }
+ return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
+ }
+
private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
if (sortedCache == null) {
sortedCache = new RecordedEvent[100_000];
@@ -206,4 +223,5 @@
}
}
}
+
}