--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon Aug 05 19:58:56 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon Aug 05 22:40:46 2019 +0200
@@ -403,4 +403,8 @@
});
resetEventCache = false;
}
+
+ public long getChunkDuration() {
+ return chunkHeader.getDurationNanos();
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Aug 05 19:58:56 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Aug 05 22:40:46 2019 +0200
@@ -68,16 +68,16 @@
chunkStartNanos = c1.getStartNanos();
Path path;
if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
- // TODO: Need to skip forward to the next event
- // For now, use the last chunk.
+ // TODO: Need to wait for next segment to arrive and then
+ // use first event, but this will do for.
path = repositoryFiles.lastPath();
} else {
- path = repositoryFiles.nextPath(chunkStartNanos);
+ path = repositoryFiles.firstPath(chunkStartNanos);
}
if (path == null) { // closed
return;
}
- chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
+ chunkStartNanos = repositoryFiles.getTimestamp(path);
try (RecordingInput input = new RecordingInput(path.toFile())) {
chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
@@ -102,11 +102,12 @@
if (isClosed()) {
return;
}
- path = repositoryFiles.nextPath(chunkStartNanos);
+ long durationNanos = chunkParser.getChunkDuration();
+ path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
if (path == null) {
return; // stream closed
}
- chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
+ chunkStartNanos = repositoryFiles.getTimestamp(path);
input.setFile(path);
chunkParser = chunkParser.newChunkParser();
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Mon Aug 05 19:58:56 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Mon Aug 05 22:40:46 2019 +0200
@@ -34,7 +34,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
@@ -61,23 +60,43 @@
}
public Path lastPath() {
- return nextPath(-1);
+ // Wait for chunks
+ while (!closed) {
+ try {
+ if (updatePaths(repository)) {
+ break;
+ }
+ } catch (IOException e) {
+ // ignore, not yet available
+ }
+ }
+ if (closed) {
+ return null;
+ }
+ // Pick the last
+ return pathSet.lastEntry().getValue();
+ }
+
+ public Path firstPath(long startTimeNanos) {
+ return path(startTimeNanos, true);
}
public Path nextPath(long startTimeNanos) {
+ return path(startTimeNanos, false);
+ }
+
+ private Path path(long timestamp, boolean first) {
while (!closed) {
- if (startTimeNanos == -1) {
- Entry<Long, Path> e = pathSet.lastEntry();
- if (e != null) {
- return e.getValue();
- }
+ Long time = timestamp;
+ if (first) {
+ // Pick closest chunk before timestamp
+ time = pathSet.floorKey(timestamp);
}
- Long f = pathSet.floorKey(startTimeNanos);
- if (f != null) {
- SortedMap<Long, Path> after = pathSet.tailMap(f);
+ if (time != null) {
+ SortedMap<Long, Path> after = pathSet.tailMap(time);
if (!after.isEmpty()) {
Path path = after.get(after.firstKey());
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + timestamp);
return path;
}
}