--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Tue Sep 17 19:37:49 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Wed Sep 18 03:45:46 2019 +0200
@@ -50,9 +50,9 @@
private final boolean active;
private final FileAccess fileAccess;
- private ChunkParser chunkParser;
- private long chunkStartNanos;
- private RecordedEvent[] sortedList;
+ private ChunkParser currentParser;
+ private long currentChunkStartNanos;
+ private RecordedEvent[] sortedCache;
public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
super(acc, active);
@@ -92,28 +92,28 @@
if (path == null) { // closed
return;
}
- chunkStartNanos = repositoryFiles.getTimestamp(path);
+ currentChunkStartNanos = repositoryFiles.getTimestamp(path);
try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
- chunkParser = new ChunkParser(input, disp.parserConfiguration);
- long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
+ currentParser = new ChunkParser(input, disp.parserConfiguration);
+ long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
long filterStart = validStartTime ? disp.startNanos : segmentStart;
long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
while (!isClosed()) {
boolean awaitnewEvent = false;
- while (!isClosed() && !chunkParser.isChunkFinished()) {
+ while (!isClosed() && !currentParser.isChunkFinished()) {
disp = dispatcher();
ParserConfiguration pc = disp.parserConfiguration;
pc.filterStart = filterStart;
pc.filterEnd = filterEnd;
- chunkParser.updateConfiguration(pc, true);
- chunkParser.setFlushOperation(getFlushOperation());
- if (pc.ordered) {
+ currentParser.updateConfiguration(pc, true);
+ currentParser.setFlushOperation(getFlushOperation());
+ if (pc.isOrdered()) {
awaitnewEvent = processOrdered(disp, awaitnewEvent);
} else {
awaitnewEvent = processUnordered(disp, awaitnewEvent);
}
- if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
+ if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
return;
}
@@ -122,14 +122,14 @@
if (isClosed()) {
return;
}
- long durationNanos = chunkParser.getChunkDuration();
- path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
+ long durationNanos = currentParser.getChunkDuration();
+ path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
if (path == null) {
return; // stream closed
}
- chunkStartNanos = repositoryFiles.getTimestamp(path);
+ currentChunkStartNanos = repositoryFiles.getTimestamp(path);
input.setFile(path);
- chunkParser = chunkParser.newChunkParser();
+ currentParser = currentParser.newChunkParser();
// TODO: Optimization. No need filter when we reach new chunk
// Could set start = 0;
}
@@ -137,12 +137,12 @@
}
private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
- if (sortedList == null) {
- sortedList = new RecordedEvent[100_000];
+ if (sortedCache == null) {
+ sortedCache = new RecordedEvent[100_000];
}
int index = 0;
while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
// wait for new event with next call to
// readStreamingEvent()
@@ -150,29 +150,29 @@
break;
}
awaitNewEvents = false;
- if (index == sortedList.length) {
- sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+ if (index == sortedCache.length) {
+ sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
}
- sortedList[index++] = e;
+ sortedCache[index++] = e;
}
// no events found
- if (index == 0 && chunkParser.isChunkFinished()) {
+ if (index == 0 && currentParser.isChunkFinished()) {
return awaitNewEvents;
}
// at least 2 events, sort them
if (index > 1) {
- Arrays.sort(sortedList, 0, index, EVENT_COMPARATOR);
+ Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR);
}
for (int i = 0; i < index; i++) {
- c.dispatch(sortedList[i]);
+ c.dispatch(sortedCache[i]);
}
return awaitNewEvents;
}
private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
return true;
} else {