--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 29 15:37:13 2019 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 29 17:31:01 2019 +0100
@@ -105,8 +105,8 @@
}
protected void processRecursionSafe() throws IOException {
+ Dispatcher lastDisp = null;
Dispatcher disp = dispatcher();
-
Path path;
boolean validStartTime = recording != null || disp.startTime != null;
if (validStartTime) {
@@ -125,18 +125,20 @@
long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
while (!isClosed()) {
- boolean awaitnewEvent = false;
while (!isClosed() && !currentParser.isChunkFinished()) {
disp = dispatcher();
- ParserConfiguration pc = disp.parserConfiguration;
- pc.filterStart = filterStart;
- pc.filterEnd = filterEnd;
- currentParser.updateConfiguration(pc, true);
- currentParser.setFlushOperation(getFlushOperation());
- if (pc.isOrdered()) {
- awaitnewEvent = processOrdered(disp, awaitnewEvent);
+ if (disp != lastDisp) {
+ ParserConfiguration pc = disp.parserConfiguration;
+ pc.filterStart = filterStart;
+ pc.filterEnd = filterEnd;
+ currentParser.updateConfiguration(pc, true);
+ currentParser.setFlushOperation(getFlushOperation());
+ lastDisp = disp;
+ }
+ if (disp.parserConfiguration.isOrdered()) {
+ processOrdered(disp);
} else {
- awaitnewEvent = processUnordered(disp, awaitnewEvent);
+ processUnordered(disp);
}
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
@@ -182,29 +184,24 @@
return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
}
- private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
+ private void processOrdered(Dispatcher c) throws IOException {
if (sortedCache == null) {
sortedCache = new RecordedEvent[100_000];
}
int index = 0;
while (true) {
- RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
+ RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
- // wait for new event with next call to
- // readStreamingEvent()
- awaitNewEvents = true;
break;
}
- awaitNewEvents = false;
if (index == sortedCache.length) {
sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
}
sortedCache[index++] = e;
}
-
// no events found
if (index == 0 && currentParser.isChunkFinished()) {
- return awaitNewEvents;
+ return;
}
// at least 2 events, sort them
if (index > 1) {
@@ -213,12 +210,12 @@
for (int i = 0; i < index; i++) {
c.dispatch(sortedCache[i]);
}
- return awaitNewEvents;
+ return;
}
- private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
+ private boolean processUnordered(Dispatcher c) throws IOException {
while (true) {
- RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
+ RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
return true;
} else {