src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57386 acdd0dbe37ee
parent 57385 7d9d4f629f6e
child 57425 1da8552f0b59
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Jun 03 16:21:47 2019 +0200
@@ -26,24 +26,268 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.AccessControlContext;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.Consumer;
 
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.Repository;
+import jdk.jfr.internal.consumer.ChunkHeader;
 import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
 
 final class EventDirectoryStream implements EventStream {
 
-    private static class EventSetConsumer extends EventConsumer {
+    private static final class RepositoryFiles {
+        private final Path repostory;
+        private final SortedMap<Long, Path> pathSet = new TreeMap<>();
+        private final Map<Path, Long> pathLookup = new HashMap<>();
+        private volatile boolean closed;
+
+        public RepositoryFiles(Path repostory) {
+            this.repostory = repostory;
+        }
+
+        Path nextPath(Path previous) {
+            long startTimeNanos = 0L;
+            if (previous != null) {
+                startTimeNanos = pathLookup.get(previous);
+            }
+            SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+            if (!after.isEmpty()) {
+                Path path = after.get(after.firstKey());
+                Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+                return path;
+            }
+            while (!closed) {
+                try {
+                    if (updatePaths()) {
+                        continue;
+                    }
+                } catch (IOException e) {
+                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
+                    // This can happen if a chunk is being removed
+                    // between the file was discovered and an instance
+                    // of an EventSet was constructed. Just ignore,
+                    // and retry later.
+                }
+                try {
+                    pathSet.wait(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            return null;
+        }
+
+        private boolean updatePaths() throws IOException {
+            boolean foundNew = false;
+            List<Path> added = new ArrayList<>();
+            Set<Path> current = new HashSet<>();
+            if (!Files.exists(repostory)) {
+                // Repository removed, probably due to shutdown
+                return true;
+            }
+            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repostory, "*.jfr")) {
+                for (Path p : dirStream) {
+                    if (!pathLookup.containsKey(p)) {
+                        added.add(p);
+                        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+                    }
+                    current.add(p);
+                }
+            }
+            List<Path> removed = new ArrayList<>();
+            for (Path p : pathLookup.keySet()) {
+                if (!current.contains(p)) {
+                    removed.add(p);
+                }
+            }
+
+            for (Path remove : removed) {
+                Long time = pathLookup.get(remove);
+                pathSet.remove(time);
+                pathLookup.remove(remove);
+            }
+            Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
+            for (Path p : added) {
+                // Only add files that have a complete header
+                // as the JVM may be in progress writing the file
+                long size = Files.size(p);
+                if (size >= ChunkHeader.HEADER_SIZE) {
+                    long startNanos = readStartTime(p);
+                    pathSet.put(startNanos, p);
+                    pathLookup.put(p, startNanos);
+                    foundNew = true;
+                }
+            }
+            return foundNew;
+        }
+
+        private long readStartTime(Path p) throws IOException {
+            try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+                ChunkHeader c = new ChunkHeader(in);
+                return c.getStartNanos();
+            }
+        }
+
+        public void close() {
+            this.closed = true;
+            synchronized (pathSet) {
+                pathSet.notify();
+            }
+        }
+    }
+
+    static final class ParserConsumer extends EventConsumer {
+
+        private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
+        private static final int DEFAULT_ARRAY_SIZE = 10_000;
+        private final RepositoryFiles repositoryFiles;
+        private ChunkParser chunkParser;
+        private boolean reuse = true;
+        private RecordedEvent[] sortedList;
+        private boolean ordered;
+
+        public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
+            super(acc);
+            repositoryFiles = new RepositoryFiles(p);
+        }
+
+        @Override
+        public void process() throws IOException {
+            Path path = repositoryFiles.nextPath(null);
+            try (RecordingInput input = new RecordingInput(path.toFile())) {
+                chunkParser = new ChunkParser(input, reuse);
+                while (!isClosed()) {
+                    boolean reuse = this.reuse;
+                    boolean ordered = this.ordered;
+                    chunkParser.setReuse(reuse);
+                    chunkParser.setOrdered(ordered);
+                    chunkParser.resetEventCache();
+                    chunkParser.updateEventParsers();
+
+                    if (ordered) {
+                        processOrdered2();
+                    } else {
+                        processUnordered();
+                    }
+                    if (chunkParser.isLastChunk()) {
+                        return;
+                    }
+                    path = repositoryFiles.nextPath(path);
+                    input.newFile(path);
+                }
+            }
+        }
+
+        private void processOrdered2() throws IOException {
+            if (sortedList == null) {
+                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
+            }
+            while (true) {
+                boolean reuse = this.reuse;
+                boolean ordered = this.ordered;
+                chunkParser.setReuse(reuse);
+                chunkParser.setOrdered(ordered);
+                chunkParser.resetEventCache();
+                chunkParser.updateEventParsers();
+                boolean awaitNewEvents = false;
+                int index = 0;
+                while (true) {
+                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                    if (e == null) {
+                        // wait for new event with next call to
+                        // readStreamingEvent()
+                        awaitNewEvents = true;
+                        break;
+                    }
+                    awaitNewEvents = false;
+                    if (index == sortedList.length) {
+                        sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+                    }
+                    sortedList[index++] = e;
+                }
+
+                // no events found
+                if (index == 0 && chunkParser.isChunkFinished()) {
+                    return;
+                }
+                // at least 2 events, sort them
+                if (index > 1) {
+                    Arrays.sort(sortedList, 0, index, END_TIME);
+                }
+                for (int i = 0; i < index; i++) {
+                    dispatch(sortedList[i]);
+                }
+                if (chunkParser.isChunkFinished()) {
+                    return;
+                }
+                runFlushActions();
+            }
+        }
+
+        private void processUnordered() throws IOException {
+            boolean awaitNewEvents = false;
+            while (true) {
+                boolean reuse = this.reuse;
+                boolean ordered = this.ordered;
+                chunkParser.setReuse(reuse);
+                chunkParser.setOrdered(ordered);
+                chunkParser.resetEventCache();
+                chunkParser.updateEventParsers();
+                while (true) {
+                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                    if (e == null) {
+                        awaitNewEvents = true;
+                        break;
+                    }
+                    dispatch(e);
+                }
+                runFlushActions();
+            }
+        }
+
+        public void setReuse(boolean reuse) {
+            this.reuse = reuse;
+        }
+
+        public void setOrdered(boolean ordered) {
+            this.ordered = ordered;
+        }
+
+        @Override
+        public void close() {
+            repositoryFiles.close();
+        }
+    }
+
+    static final class SharedParserConsumer extends EventConsumer {
         private EventSetLocation location;
         private EventSet eventSet;
         private int eventSetIndex;
         private int eventArrayIndex;
         private RecordedEvent[] currentEventArray = new RecordedEvent[0];
 
-        public EventSetConsumer(AccessControlContext acc) throws IOException {
+        public SharedParserConsumer(AccessControlContext acc) throws IOException {
             super(acc);
         }
 
@@ -108,10 +352,11 @@
         }
     }
 
-    private final EventSetConsumer eventConsumer;
+    private final EventConsumer eventConsumer;
 
     public EventDirectoryStream(AccessControlContext acc) throws IOException {
-        eventConsumer = new EventSetConsumer(acc);
+        // Path p = Repository.getRepository().getRepositoryPath().toPath();
+        eventConsumer = new SharedParserConsumer(acc);
     }
 
     public void close() {
@@ -184,8 +429,6 @@
 
     @Override
     public void setOrdered(boolean ordered) {
-      if (ordered == false) {
-          throw new UnsupportedOperationException("Unordered not implemented yet");
-      }
+        eventConsumer.setOrdered(ordered);
     }
 }