src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 58129 7b751fe181a5
parent 58020 f082177c5023
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -32,6 +32,7 @@
 import java.util.Arrays;
 import java.util.Objects;
 
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
@@ -46,6 +47,7 @@
     private final RepositoryFiles repositoryFiles;
     private final boolean active;
     private final FileAccess fileAccess;
+
     private ChunkParser chunkParser;
     private long chunkStartNanos;
     private RecordedEvent[] sortedList;
@@ -60,7 +62,7 @@
     @Override
     public void close() {
         setClosed(true);
-        runCloseActions();
+        dispatcher().runCloseActions();
         repositoryFiles.close();
     }
 
@@ -76,11 +78,12 @@
 
     @Override
     protected void process() throws Exception {
-        StreamConfiguration c = configuration;
+        Dispatcher disp = dispatcher();
+
         Path path;
-        boolean validStartTime = active || c.getStartTime() != null;
+        boolean validStartTime = active || disp.startTime != null;
         if (validStartTime) {
-            path = repositoryFiles.firstPath(c.getStartNanos());
+            path = repositoryFiles.firstPath(disp.startNanos);
         } else {
             path = repositoryFiles.lastPath();
         }
@@ -89,28 +92,24 @@
         }
         chunkStartNanos = repositoryFiles.getTimestamp(path);
         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
-            chunkParser = new ChunkParser(input, c.getReuse());
+            chunkParser = new ChunkParser(input, disp.parserConfiguration);
             long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
-            long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
-            long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+            long filterStart = validStartTime ? disp.startNanos : segmentStart;
+            long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
+
             while (!isClosed()) {
                 boolean awaitnewEvent = false;
                 while (!isClosed() && !chunkParser.isChunkFinished()) {
-                    c = configuration;
-                    boolean ordered = c.getOrdered();
+                    disp = dispatcher();
+                    ParserConfiguration pc = disp.parserConfiguration;
+                    pc.filterStart = filterStart;
+                    pc.filterEnd = filterEnd;
+                    chunkParser.updateConfiguration(pc, true);
                     chunkParser.setFlushOperation(getFlushOperation());
-                    chunkParser.setReuse(c.getReuse());
-                    chunkParser.setOrdered(ordered);
-                    chunkParser.setFilterStart(filtertStart);
-                    chunkParser.setFilterEnd(filterEnd);
-                    chunkParser.resetEventCache();
-                    chunkParser.setParserFilter(c.getFilter());
-                    chunkParser.updateEventParsers();
-                    c.clearDispatchCache();
-                    if (ordered) {
-                        awaitnewEvent = processOrdered(c, awaitnewEvent);
+                    if (pc.ordered) {
+                        awaitnewEvent = processOrdered(disp, awaitnewEvent);
                     } else {
-                        awaitnewEvent = processUnordered(c, awaitnewEvent);
+                        awaitnewEvent = processUnordered(disp, awaitnewEvent);
                     }
                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
                         close();
@@ -135,7 +134,7 @@
         }
     }
 
-    private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+    private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
         if (sortedList == null) {
             sortedList = new RecordedEvent[100_000];
         }
@@ -164,18 +163,18 @@
             Arrays.sort(sortedList, 0, index, END_TIME);
         }
         for (int i = 0; i < index; i++) {
-            dispatch(c, sortedList[i]);
+            c.dispatch(sortedList[i]);
         }
         return awaitNewEvents;
     }
 
-    private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+    private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
         while (true) {
             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
             if (e == null) {
                 return true;
             } else {
-                dispatch(c, e);
+                c.dispatch(e);
             }
         }
     }