Use one parser per recording stream / thread JEP-349-branch
authoregahlin
Tue, 25 Jun 2019 12:03:01 +0200
branchJEP-349-branch
changeset 57425 1da8552f0b59
parent 57386 acdd0dbe37ee
child 57427 596f839ce88f
Use one parser per recording stream / thread
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Tue Jun 25 12:03:01 2019 +0200
@@ -66,20 +66,20 @@
             this.repostory = repostory;
         }
 
-        Path nextPath(Path previous) {
-            long startTimeNanos = 0L;
-            if (previous != null) {
-                startTimeNanos = pathLookup.get(previous);
-            }
-            SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
-            if (!after.isEmpty()) {
-                Path path = after.get(after.firstKey());
-                Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
-                return path;
-            }
+        long getTimestamp(Path p) {
+            return  pathLookup.get(p);
+        }
+
+        Path nextPath(long startTimeNanos) {
             while (!closed) {
+                SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+                if (!after.isEmpty()) {
+                    Path path = after.get(after.firstKey());
+                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+                    return path;
+                }
                 try {
-                    if (updatePaths()) {
+                    if (updatePaths(repostory)) {
                         continue;
                     }
                 } catch (IOException e) {
@@ -90,7 +90,9 @@
                     // and retry later.
                 }
                 try {
-                    pathSet.wait(1000);
+                    synchronized (pathSet) {
+                        pathSet.wait(1000);
+                    }
                 } catch (InterruptedException e) {
                     // ignore
                 }
@@ -98,15 +100,18 @@
             return null;
         }
 
-        private boolean updatePaths() throws IOException {
+        private boolean updatePaths(Path repo) throws IOException {
+            if (repo == null) {
+                repo = Repository.getRepository().getRepositoryPath().toPath();
+            }
             boolean foundNew = false;
             List<Path> added = new ArrayList<>();
             Set<Path> current = new HashSet<>();
-            if (!Files.exists(repostory)) {
+            if (!Files.exists(repo)) {
                 // Repository removed, probably due to shutdown
                 return true;
             }
-            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repostory, "*.jfr")) {
+            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
                 for (Path p : dirStream) {
                     if (!pathLookup.containsKey(p)) {
                         added.add(p);
@@ -150,8 +155,8 @@
         }
 
         public void close() {
-            this.closed = true;
             synchronized (pathSet) {
+                this.closed = true;
                 pathSet.notify();
             }
         }
@@ -165,7 +170,7 @@
         private ChunkParser chunkParser;
         private boolean reuse = true;
         private RecordedEvent[] sortedList;
-        private boolean ordered;
+        private boolean ordered = true;
 
         public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
             super(acc);
@@ -174,96 +179,78 @@
 
         @Override
         public void process() throws IOException {
-            Path path = repositoryFiles.nextPath(null);
+            Path path = repositoryFiles.nextPath(startNanos);
+            startNanos = repositoryFiles.getTimestamp(path) + 1;
             try (RecordingInput input = new RecordingInput(path.toFile())) {
-                chunkParser = new ChunkParser(input, reuse);
                 while (!isClosed()) {
-                    boolean reuse = this.reuse;
-                    boolean ordered = this.ordered;
-                    chunkParser.setReuse(reuse);
-                    chunkParser.setOrdered(ordered);
-                    chunkParser.resetEventCache();
-                    chunkParser.updateEventParsers();
+                    // chunkParser = chunkParser.nextChunkParser();
+                    chunkParser = new ChunkParser(input, this.reuse);
+                    boolean awaitnewEvent = false;
+                    while (!isClosed() && !chunkParser.isChunkFinished()) {
+                        chunkParser.setReuse(this.reuse);
+                        chunkParser.setOrdered(this.ordered);
+                        chunkParser.resetEventCache();
+                        chunkParser.updateEventParsers();
+                        if (ordered) {
+                            awaitnewEvent = processOrdered(awaitnewEvent);
+                        } else {
+                            awaitnewEvent = processUnordered(awaitnewEvent);
+                        }
+                        runFlushActions();
+                    }
 
-                    if (ordered) {
-                        processOrdered2();
-                    } else {
-                        processUnordered();
-                    }
-                    if (chunkParser.isLastChunk()) {
-                        return;
-                    }
-                    path = repositoryFiles.nextPath(path);
-                    input.newFile(path);
+                    path = repositoryFiles.nextPath(startNanos);
+                    startNanos = repositoryFiles.getTimestamp(path) + 1;
+                    input.setFile(path);
                 }
             }
         }
 
-        private void processOrdered2() throws IOException {
+        private boolean processOrdered(boolean awaitNewEvents) throws IOException {
             if (sortedList == null) {
                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
             }
+            int index = 0;
             while (true) {
-                boolean reuse = this.reuse;
-                boolean ordered = this.ordered;
-                chunkParser.setReuse(reuse);
-                chunkParser.setOrdered(ordered);
-                chunkParser.resetEventCache();
-                chunkParser.updateEventParsers();
-                boolean awaitNewEvents = false;
-                int index = 0;
-                while (true) {
-                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
-                    if (e == null) {
-                        // wait for new event with next call to
-                        // readStreamingEvent()
-                        awaitNewEvents = true;
-                        break;
-                    }
-                    awaitNewEvents = false;
-                    if (index == sortedList.length) {
-                        sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
-                    }
-                    sortedList[index++] = e;
+                RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                if (e == null) {
+                    // wait for new event with next call to
+                    // readStreamingEvent()
+                    awaitNewEvents = true;
+                    break;
+                }
+                awaitNewEvents = false;
+                if (index == sortedList.length) {
+                    sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
                 }
+                sortedList[index++] = e;
+            }
 
-                // no events found
-                if (index == 0 && chunkParser.isChunkFinished()) {
-                    return;
-                }
-                // at least 2 events, sort them
-                if (index > 1) {
-                    Arrays.sort(sortedList, 0, index, END_TIME);
-                }
-                for (int i = 0; i < index; i++) {
-                    dispatch(sortedList[i]);
-                }
-                if (chunkParser.isChunkFinished()) {
-                    return;
-                }
-                runFlushActions();
+            // no events found
+            if (index == 0 && chunkParser.isChunkFinished()) {
+                return awaitNewEvents;
             }
+            // at least 2 events, sort them
+            if (index > 1) {
+                Arrays.sort(sortedList, 0, index, END_TIME);
+            }
+            for (int i = 0; i < index; i++) {
+                dispatch(sortedList[i]);
+            }
+            return awaitNewEvents;
         }
 
-        private void processUnordered() throws IOException {
-            boolean awaitNewEvents = false;
+        private boolean processUnordered(boolean awaitNewEvents) throws IOException {
             while (true) {
-                boolean reuse = this.reuse;
-                boolean ordered = this.ordered;
-                chunkParser.setReuse(reuse);
-                chunkParser.setOrdered(ordered);
-                chunkParser.resetEventCache();
-                chunkParser.updateEventParsers();
-                while (true) {
-                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
-                    if (e == null) {
-                        awaitNewEvents = true;
-                        break;
-                    }
+                RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                if (e == null) {
+                    awaitNewEvents = true;
+                    break;
+                } else {
                     dispatch(e);
                 }
-                runFlushActions();
             }
+            return awaitNewEvents;
         }
 
         public void setReuse(boolean reuse) {
@@ -354,9 +341,8 @@
 
     private final EventConsumer eventConsumer;
 
-    public EventDirectoryStream(AccessControlContext acc) throws IOException {
-        // Path p = Repository.getRepository().getRepositoryPath().toPath();
-        eventConsumer = new SharedParserConsumer(acc);
+    public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
+        eventConsumer = new ParserConsumer(acc, p);
     }
 
     public void close() {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Tue Jun 25 12:03:01 2019 +0200
@@ -81,7 +81,7 @@
         this.recording = new Recording();
         this.recording.setFlushInterval(Duration.ofMillis(1000));
         try {
-            this.stream = new EventDirectoryStream(acc);
+            this.stream = new EventDirectoryStream(acc, null);
         } catch (IOException ioe) {
             throw new IllegalStateException(ioe.getMessage());
         }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Tue Jun 25 12:03:01 2019 +0200
@@ -65,8 +65,8 @@
         }
     }
 
-    private final RandomAccessFile file;
-    private final String filename;
+    private RandomAccessFile file;
+    private String filename;
     private Block currentBlock = new Block();
     private Block previousBlock = new Block();
     private long position;
@@ -75,8 +75,16 @@
 
     public RecordingInput(File f, int blockSize) throws IOException {
         this.blockSize = blockSize;
+        initialize(f);
+    }
+
+    private void initialize(File f) throws IOException {
         this.filename = f.getAbsolutePath().toString();
         this.file = new RandomAccessFile(f, "r");
+        this.position = 0;
+        this.size = -1;
+        this.currentBlock.reset();
+        this.previousBlock.reset();
         if (f.length() < 8) {
             throw new IOException("Not a valid Flight Recorder file. File length is only " + f.length() + " bytes.");
         }
@@ -335,19 +343,14 @@
 
     // Purpose of this method is to reuse block cache from a
     // previous RecordingInput
-    public RecordingInput newFile(Path path) throws IOException  {
+    public void setFile(Path path) throws IOException  {
         try {
-            close();
+            file.close();
         } catch (IOException e) {
             // perhaps deleted
         }
-        RecordingInput input = new RecordingInput(path.toFile(), this.blockSize);
-        input.currentBlock = this.currentBlock;
-        input.currentBlock.reset();
-        input.previousBlock = this.previousBlock;
-        input.previousBlock.reset();
-
-        return input;
+        file = null;
+        initialize(path.toFile());
     }
 
 }