Add support for checkpoint aware flushing JEP-349-branch
authoregahlin
Fri, 23 Aug 2019 18:45:47 +0200
branchJEP-349-branch
changeset 57861 86022e34ba63
parent 57860 588a3f63efff
child 57862 84ef29ccac56
Add support for checkpoint aware flushing
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Aug 23 18:45:47 2019 +0200
@@ -327,6 +327,8 @@
     private final AccessControlContext accessControlContext;
     private final Thread thread;
     private final boolean active;
+    protected final Runnable flushOperation = () -> runFlushActions();
+
     // Updated by updateConfiguration()
     protected StreamConfiguration configuration = new StreamConfiguration();
 
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri Aug 23 18:45:47 2019 +0200
@@ -66,6 +66,7 @@
     private boolean resetEventCache;
     private long firstNanos = 0;
     private long lastNanos = Long.MAX_VALUE;
+    private Runnable flushOperation;
 
     public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
        this(new ChunkHeader(input), null, 1000);
@@ -176,8 +177,8 @@
                 throw new IOException("Event can't have zero size");
             }
             long typeId = input.readLong();
-            // Skip metadata and constant pool events (id = 0, id = 1)
-            if (typeId > CONSTANT_POOL_TYPE_ID) {
+
+            if (typeId != 0) { // Not metadata event
                 Parser p = parsers.get(typeId);
                 if (p instanceof EventParser) {
                     EventParser ep = (EventParser) p;
@@ -187,12 +188,27 @@
                         return event;
                     }
                 }
+                if (typeId == 1 && flushOperation != null) { // checkpoint event
+                    parseCheckpoint();
+                }
             }
             input.position(pos + size);
         }
         return null;
     }
 
+    private void parseCheckpoint() throws IOException {
+        // Content has been parsed previously. This
+        // is for triggering flsuh
+        input.readLong(); // timestamp
+        input.readLong(); // duration
+        input.readLong(); // delta
+        boolean flush = input.readBoolean();
+        if (flush) {
+            flushOperation.run();
+        }
+    }
+
     private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException {
         if (Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO)) {
             Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
@@ -357,6 +373,10 @@
         this.reuse = resue;
     }
 
+    public void setFlushOperation(Runnable flushOperation) {
+        this.flushOperation = flushOperation;
+    }
+
     // Need to call updateEventParsers() for
     // change to take effect
     public void setOrdered(boolean ordered) {
@@ -373,11 +393,11 @@
         }
         this.firstNanos = firstNanos;
     }
+
     public void setLastNanos(long lastNanos) {
         this.lastNanos = lastNanos;
     }
 
-
     // Need to call updateEventParsers() for
     // change to take effect
     public void resetEventCache() {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Aug 23 18:45:47 2019 +0200
@@ -90,6 +90,7 @@
                     while (!isClosed() && !chunkParser.isChunkFinished()) {
                         final StreamConfiguration c2 = configuration;
                         boolean ordered = c2.getOrdered();
+                        chunkParser.setFlushOperation(flushOperation);
                         chunkParser.setReuse(c2.getReuse());
                         chunkParser.setOrdered(ordered);
                         chunkParser.setFirstNanos(start);
@@ -103,7 +104,6 @@
                         } else {
                             awaitnewEvent = processUnordered(awaitnewEvent);
                         }
-                        runFlushActions();
                         if (segmentStart > end) {
                             close();
                             return;
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Aug 23 15:34:18 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Aug 23 18:45:47 2019 +0200
@@ -77,6 +77,7 @@
                 }
                 StreamConfiguration c2 = configuration;
                 boolean ordered = c2.getOrdered();
+                chunkParser.setFlushOperation(flushOperation);
                 chunkParser.setFirstNanos(start);
                 chunkParser.setLastNanos(end);
                 chunkParser.setReuse(c2.getReuse());
@@ -90,7 +91,6 @@
                 } else {
                     processUnordered();
                 }
-                runFlushActions();
                 if (chunkParser.isLastChunk()) {
                     return;
                 }