diff -r 80e1201f6c9a -r a0f39cc47387 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 22 09:06:35 2019 -0500 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 22 17:20:43 2019 +0100 @@ -35,6 +35,7 @@ import jdk.jfr.consumer.RecordedEvent; import jdk.jfr.internal.JVM; +import jdk.jfr.internal.PlatformRecording; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration; @@ -43,12 +44,12 @@ * with chunk files. * */ -public final class EventDirectoryStream extends AbstractEventStream { +public class EventDirectoryStream extends AbstractEventStream { private final static Comparator EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator(); private final RepositoryFiles repositoryFiles; - private final boolean active; + private final PlatformRecording recording; private final FileAccess fileAccess; private ChunkParser currentParser; @@ -56,10 +57,10 @@ private RecordedEvent[] sortedCache; private int threadExclusionLevel = 0; - public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { - super(acc, active); + public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException { + super(acc, recording); this.fileAccess = Objects.requireNonNull(fileAccess); - this.active = active; + this.recording = recording; this.repositoryFiles = new RepositoryFiles(fileAccess, p); } @@ -104,7 +105,7 @@ Dispatcher disp = dispatcher(); Path path; - boolean validStartTime = active || disp.startTime != null; + boolean validStartTime = recording != null || disp.startTime != null; if (validStartTime) { path = repositoryFiles.firstPath(disp.startNanos); } else { @@ -139,8 +140,17 @@ return; } } + if (isLastChunk()) { + // Recording was stopped/closed externally, and no more data to process. + return; + } + if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) { + // JVM process exited/crashed, or repository migrated to an unknown location + return; + } if (isClosed()) { + // Stream was closed return; } long durationNanos = currentParser.getChunkDuration(); @@ -162,6 +172,13 @@ } } + private boolean isLastChunk() { + if (recording == null) { + return false; + } + return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); + } + private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { if (sortedCache == null) { sortedCache = new RecordedEvent[100_000]; @@ -206,4 +223,5 @@ } } } + }