--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Aug 23 18:45:47 2019 +0200
@@ -327,6 +327,8 @@
private final AccessControlContext accessControlContext;
private final Thread thread;
private final boolean active;
+ protected final Runnable flushOperation = () -> runFlushActions();
+
// Updated by updateConfiguration()
protected StreamConfiguration configuration = new StreamConfiguration();
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Aug 23 18:45:47 2019 +0200
@@ -66,6 +66,7 @@
private boolean resetEventCache;
private long firstNanos = 0;
private long lastNanos = Long.MAX_VALUE;
+ private Runnable flushOperation;
public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
this(new ChunkHeader(input), null, 1000);
@@ -176,8 +177,8 @@
throw new IOException("Event can't have zero size");
}
long typeId = input.readLong();
- // Skip metadata and constant pool events (id = 0, id = 1)
- if (typeId > CONSTANT_POOL_TYPE_ID) {
+
+ if (typeId != 0) { // Not metadata event
Parser p = parsers.get(typeId);
if (p instanceof EventParser) {
EventParser ep = (EventParser) p;
@@ -187,12 +188,27 @@
return event;
}
}
+ if (typeId == 1 && flushOperation != null) { // checkpoint event
+ parseCheckpoint();
+ }
}
input.position(pos + size);
}
return null;
}
+ private void parseCheckpoint() throws IOException {
+ // Content has been parsed previously. This
+ // is for triggering flsuh
+ input.readLong(); // timestamp
+ input.readLong(); // duration
+ input.readLong(); // delta
+ boolean flush = input.readBoolean();
+ if (flush) {
+ flushOperation.run();
+ }
+ }
+
private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException {
if (Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO)) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
@@ -357,6 +373,10 @@
this.reuse = resue;
}
+ public void setFlushOperation(Runnable flushOperation) {
+ this.flushOperation = flushOperation;
+ }
+
// Need to call updateEventParsers() for
// change to take effect
public void setOrdered(boolean ordered) {
@@ -373,11 +393,11 @@
}
this.firstNanos = firstNanos;
}
+
public void setLastNanos(long lastNanos) {
this.lastNanos = lastNanos;
}
-
// Need to call updateEventParsers() for
// change to take effect
public void resetEventCache() {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Aug 23 18:45:47 2019 +0200
@@ -90,6 +90,7 @@
while (!isClosed() && !chunkParser.isChunkFinished()) {
final StreamConfiguration c2 = configuration;
boolean ordered = c2.getOrdered();
+ chunkParser.setFlushOperation(flushOperation);
chunkParser.setReuse(c2.getReuse());
chunkParser.setOrdered(ordered);
chunkParser.setFirstNanos(start);
@@ -103,7 +104,6 @@
} else {
awaitnewEvent = processUnordered(awaitnewEvent);
}
- runFlushActions();
if (segmentStart > end) {
close();
return;
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Aug 23 18:45:47 2019 +0200
@@ -77,6 +77,7 @@
}
StreamConfiguration c2 = configuration;
boolean ordered = c2.getOrdered();
+ chunkParser.setFlushOperation(flushOperation);
chunkParser.setFirstNanos(start);
chunkParser.setLastNanos(end);
chunkParser.setReuse(c2.getReuse());
@@ -90,7 +91,6 @@
} else {
processUnordered();
}
- runFlushActions();
if (chunkParser.isLastChunk()) {
return;
}