--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Jun 27 10:41:01 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jul 03 22:51:44 2019 +0200
@@ -35,7 +35,6 @@
import java.util.Objects;
import java.util.function.Consumer;
-import jdk.jfr.internal.consumer.EventConsumer;
import jdk.jfr.internal.consumer.RecordingInput;
import jdk.jfr.internal.consumer.RepositoryFiles;
@@ -46,23 +45,32 @@
*/
final class EventDirectoryStream implements EventStream {
- static final class ParserConsumer extends EventConsumer {
+ static final class DirectoryConsumer extends EventConsumer {
private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
private static final int DEFAULT_ARRAY_SIZE = 10_000;
private final RepositoryFiles repositoryFiles;
private ChunkParser chunkParser;
private RecordedEvent[] sortedList;
+ protected long chunkStartNanos;
- public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
+ public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
super(acc);
repositoryFiles = new RepositoryFiles(p);
}
@Override
public void process() throws IOException {
- Path path = repositoryFiles.nextPath(startNanos);
- startNanos = repositoryFiles.getTimestamp(path) + 1;
+ chunkStartNanos = startNanos;
+ Path path;
+ if (startTime == EventConsumer.NEXT_EVENT) {
+ // TODO: Need to skip forward to the next event
+ // For now, use the last chunk.
+ path = repositoryFiles.lastPath();
+ } else {
+ path = repositoryFiles.nextPath(chunkStartNanos);
+ }
+ chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
try (RecordingInput input = new RecordingInput(path.toFile())) {
chunkParser = new ChunkParser(input, this.reuse);
while (!isClosed()) {
@@ -81,11 +89,11 @@
runFlushActions();
}
- path = repositoryFiles.nextPath(startNanos);
+ path = repositoryFiles.nextPath(chunkStartNanos);
if (path == null) {
return; // stream closed
}
- startNanos = repositoryFiles.getTimestamp(path) + 1;
+ chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
input.setFile(path);
chunkParser = chunkParser.newChunkParser();
}
@@ -147,8 +155,9 @@
private final EventConsumer eventConsumer;
- public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
- eventConsumer = new ParserConsumer(acc, p);
+ public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
+ eventConsumer = new DirectoryConsumer(acc, p);
+ eventConsumer.startTime = startTime;
}
public void close() {
@@ -223,4 +232,9 @@
public void setOrdered(boolean ordered) {
eventConsumer.setOrdered(ordered);
}
+
+ @Override
+ public void setStartTime(Instant startTime) {
+ eventConsumer.setStartTime(startTime);
+ }
}