--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Objects;
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
@@ -46,6 +47,7 @@
private final RepositoryFiles repositoryFiles;
private final boolean active;
private final FileAccess fileAccess;
+
private ChunkParser chunkParser;
private long chunkStartNanos;
private RecordedEvent[] sortedList;
@@ -60,7 +62,7 @@
@Override
public void close() {
setClosed(true);
- runCloseActions();
+ dispatcher().runCloseActions();
repositoryFiles.close();
}
@@ -76,11 +78,12 @@
@Override
protected void process() throws Exception {
- StreamConfiguration c = configuration;
+ Dispatcher disp = dispatcher();
+
Path path;
- boolean validStartTime = active || c.getStartTime() != null;
+ boolean validStartTime = active || disp.startTime != null;
if (validStartTime) {
- path = repositoryFiles.firstPath(c.getStartNanos());
+ path = repositoryFiles.firstPath(disp.startNanos);
} else {
path = repositoryFiles.lastPath();
}
@@ -89,28 +92,24 @@
}
chunkStartNanos = repositoryFiles.getTimestamp(path);
try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
- chunkParser = new ChunkParser(input, c.getReuse());
+ chunkParser = new ChunkParser(input, disp.parserConfiguration);
long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
- long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
- long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+ long filterStart = validStartTime ? disp.startNanos : segmentStart;
+ long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
+
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !chunkParser.isChunkFinished()) {
- c = configuration;
- boolean ordered = c.getOrdered();
+ disp = dispatcher();
+ ParserConfiguration pc = disp.parserConfiguration;
+ pc.filterStart = filterStart;
+ pc.filterEnd = filterEnd;
+ chunkParser.updateConfiguration(pc, true);
chunkParser.setFlushOperation(getFlushOperation());
- chunkParser.setReuse(c.getReuse());
- chunkParser.setOrdered(ordered);
- chunkParser.setFilterStart(filtertStart);
- chunkParser.setFilterEnd(filterEnd);
- chunkParser.resetEventCache();
- chunkParser.setParserFilter(c.getFilter());
- chunkParser.updateEventParsers();
- c.clearDispatchCache();
- if (ordered) {
- awaitnewEvent = processOrdered(c, awaitnewEvent);
+ if (pc.ordered) {
+ awaitnewEvent = processOrdered(disp, awaitnewEvent);
} else {
- awaitnewEvent = processUnordered(c, awaitnewEvent);
+ awaitnewEvent = processUnordered(disp, awaitnewEvent);
}
if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
close();
@@ -135,7 +134,7 @@
}
}
- private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+ private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[100_000];
}
@@ -164,18 +163,18 @@
Arrays.sort(sortedList, 0, index, END_TIME);
}
for (int i = 0; i < index; i++) {
- dispatch(c, sortedList[i]);
+ c.dispatch(sortedList[i]);
}
return awaitNewEvents;
}
- private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+ private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
while (true) {
RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
return true;
} else {
- dispatch(c, e);
+ c.dispatch(e);
}
}
}