First parts of unshared parser + updated javadoc JEP-349-branch
authoregahlin
Mon, 03 Jun 2019 16:21:47 +0200
branchJEP-349-branch
changeset 57386 acdd0dbe37ee
parent 57385 7d9d4f629f6e
child 57425 1da8552f0b59
First parts of unshared parser + updated javadoc
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Jun 03 16:21:47 2019 +0200
@@ -26,24 +26,268 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.AccessControlContext;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.Consumer;
 
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.Repository;
+import jdk.jfr.internal.consumer.ChunkHeader;
 import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
 
 final class EventDirectoryStream implements EventStream {
 
-    private static class EventSetConsumer extends EventConsumer {
+    private static final class RepositoryFiles {
+        private final Path repostory;
+        private final SortedMap<Long, Path> pathSet = new TreeMap<>();
+        private final Map<Path, Long> pathLookup = new HashMap<>();
+        private volatile boolean closed;
+
+        public RepositoryFiles(Path repostory) {
+            this.repostory = repostory;
+        }
+
+        Path nextPath(Path previous) {
+            long startTimeNanos = 0L;
+            if (previous != null) {
+                startTimeNanos = pathLookup.get(previous);
+            }
+            SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+            if (!after.isEmpty()) {
+                Path path = after.get(after.firstKey());
+                Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+                return path;
+            }
+            while (!closed) {
+                try {
+                    if (updatePaths()) {
+                        continue;
+                    }
+                } catch (IOException e) {
+                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
+                    // This can happen if a chunk is being removed
+                    // between the file was discovered and an instance
+                    // of an EventSet was constructed. Just ignore,
+                    // and retry later.
+                }
+                try {
+                    pathSet.wait(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            return null;
+        }
+
+        private boolean updatePaths() throws IOException {
+            boolean foundNew = false;
+            List<Path> added = new ArrayList<>();
+            Set<Path> current = new HashSet<>();
+            if (!Files.exists(repostory)) {
+                // Repository removed, probably due to shutdown
+                return true;
+            }
+            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repostory, "*.jfr")) {
+                for (Path p : dirStream) {
+                    if (!pathLookup.containsKey(p)) {
+                        added.add(p);
+                        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+                    }
+                    current.add(p);
+                }
+            }
+            List<Path> removed = new ArrayList<>();
+            for (Path p : pathLookup.keySet()) {
+                if (!current.contains(p)) {
+                    removed.add(p);
+                }
+            }
+
+            for (Path remove : removed) {
+                Long time = pathLookup.get(remove);
+                pathSet.remove(time);
+                pathLookup.remove(remove);
+            }
+            Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
+            for (Path p : added) {
+                // Only add files that have a complete header
+                // as the JVM may be in progress writing the file
+                long size = Files.size(p);
+                if (size >= ChunkHeader.HEADER_SIZE) {
+                    long startNanos = readStartTime(p);
+                    pathSet.put(startNanos, p);
+                    pathLookup.put(p, startNanos);
+                    foundNew = true;
+                }
+            }
+            return foundNew;
+        }
+
+        private long readStartTime(Path p) throws IOException {
+            try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+                ChunkHeader c = new ChunkHeader(in);
+                return c.getStartNanos();
+            }
+        }
+
+        public void close() {
+            this.closed = true;
+            synchronized (pathSet) {
+                pathSet.notify();
+            }
+        }
+    }
+
+    static final class ParserConsumer extends EventConsumer {
+
+        private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
+        private static final int DEFAULT_ARRAY_SIZE = 10_000;
+        private final RepositoryFiles repositoryFiles;
+        private ChunkParser chunkParser;
+        private boolean reuse = true;
+        private RecordedEvent[] sortedList;
+        private boolean ordered;
+
+        public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
+            super(acc);
+            repositoryFiles = new RepositoryFiles(p);
+        }
+
+        @Override
+        public void process() throws IOException {
+            Path path = repositoryFiles.nextPath(null);
+            try (RecordingInput input = new RecordingInput(path.toFile())) {
+                chunkParser = new ChunkParser(input, reuse);
+                while (!isClosed()) {
+                    boolean reuse = this.reuse;
+                    boolean ordered = this.ordered;
+                    chunkParser.setReuse(reuse);
+                    chunkParser.setOrdered(ordered);
+                    chunkParser.resetEventCache();
+                    chunkParser.updateEventParsers();
+
+                    if (ordered) {
+                        processOrdered2();
+                    } else {
+                        processUnordered();
+                    }
+                    if (chunkParser.isLastChunk()) {
+                        return;
+                    }
+                    path = repositoryFiles.nextPath(path);
+                    input.newFile(path);
+                }
+            }
+        }
+
+        private void processOrdered2() throws IOException {
+            if (sortedList == null) {
+                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
+            }
+            while (true) {
+                boolean reuse = this.reuse;
+                boolean ordered = this.ordered;
+                chunkParser.setReuse(reuse);
+                chunkParser.setOrdered(ordered);
+                chunkParser.resetEventCache();
+                chunkParser.updateEventParsers();
+                boolean awaitNewEvents = false;
+                int index = 0;
+                while (true) {
+                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                    if (e == null) {
+                        // wait for new event with next call to
+                        // readStreamingEvent()
+                        awaitNewEvents = true;
+                        break;
+                    }
+                    awaitNewEvents = false;
+                    if (index == sortedList.length) {
+                        sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+                    }
+                    sortedList[index++] = e;
+                }
+
+                // no events found
+                if (index == 0 && chunkParser.isChunkFinished()) {
+                    return;
+                }
+                // at least 2 events, sort them
+                if (index > 1) {
+                    Arrays.sort(sortedList, 0, index, END_TIME);
+                }
+                for (int i = 0; i < index; i++) {
+                    dispatch(sortedList[i]);
+                }
+                if (chunkParser.isChunkFinished()) {
+                    return;
+                }
+                runFlushActions();
+            }
+        }
+
+        private void processUnordered() throws IOException {
+            boolean awaitNewEvents = false;
+            while (true) {
+                boolean reuse = this.reuse;
+                boolean ordered = this.ordered;
+                chunkParser.setReuse(reuse);
+                chunkParser.setOrdered(ordered);
+                chunkParser.resetEventCache();
+                chunkParser.updateEventParsers();
+                while (true) {
+                    RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+                    if (e == null) {
+                        awaitNewEvents = true;
+                        break;
+                    }
+                    dispatch(e);
+                }
+                runFlushActions();
+            }
+        }
+
+        public void setReuse(boolean reuse) {
+            this.reuse = reuse;
+        }
+
+        public void setOrdered(boolean ordered) {
+            this.ordered = ordered;
+        }
+
+        @Override
+        public void close() {
+            repositoryFiles.close();
+        }
+    }
+
+    static final class SharedParserConsumer extends EventConsumer {
         private EventSetLocation location;
         private EventSet eventSet;
         private int eventSetIndex;
         private int eventArrayIndex;
         private RecordedEvent[] currentEventArray = new RecordedEvent[0];
 
-        public EventSetConsumer(AccessControlContext acc) throws IOException {
+        public SharedParserConsumer(AccessControlContext acc) throws IOException {
             super(acc);
         }
 
@@ -108,10 +352,11 @@
         }
     }
 
-    private final EventSetConsumer eventConsumer;
+    private final EventConsumer eventConsumer;
 
     public EventDirectoryStream(AccessControlContext acc) throws IOException {
-        eventConsumer = new EventSetConsumer(acc);
+        // Path p = Repository.getRepository().getRepositoryPath().toPath();
+        eventConsumer = new SharedParserConsumer(acc);
     }
 
     public void close() {
@@ -184,8 +429,6 @@
 
     @Override
     public void setOrdered(boolean ordered) {
-      if (ordered == false) {
-          throw new UnsupportedOperationException("Unordered not implemented yet");
-      }
+        eventConsumer.setOrdered(ordered);
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Jun 03 16:21:47 2019 +0200
@@ -125,6 +125,11 @@
         public void setOrdered(boolean ordered) {
             this.ordered = ordered;
         }
+
+        @Override
+        public void close() {
+
+        }
     }
 
     private final RecordingInput input;
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Mon Jun 03 16:21:47 2019 +0200
@@ -163,7 +163,6 @@
      */
     public void setOrdered(boolean ordered);
 
-
     /**
      * Starts processing events in the stream.
      * <p>
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon Jun 03 16:21:47 2019 +0200
@@ -29,6 +29,7 @@
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.time.Duration;
+import java.util.Map;
 import java.util.function.Consumer;
 
 import jdk.jfr.Configuration;
@@ -138,6 +139,37 @@
     }
 
     /**
+     * Replaces all settings for this recording stream
+     * <p>
+     * The following example shows how to set event settings for a recording.
+     *
+     * <pre>
+     * <code>
+     *     Map{@literal <}String, String{@literal >} settings = new HashMap{@literal <}{@literal >}();
+     *     settings.putAll(EventSettings.enabled("jdk.CPUSample").withPeriod(Duration.ofSeconds(2)).toMap());
+     *     settings.putAll(EventSettings.enabled(MyEvent.class).withThreshold(Duration.ofSeconds(2)).withoutStackTrace().toMap());
+     *     settings.put("jdk.ExecutionSample#period", "10 ms");
+     *     recordingStream.setSettings(settings);
+     * </code>
+     * </pre>
+     *
+     * The following example shows how to merge settings.
+     *
+     * <pre>
+     *     {@code
+     *     Map<String, String> settings = recording.getSettings();
+     *     settings.putAll(additionalSettings);
+     *     recordingStream.setSettings(settings);
+     * }
+     * </pre>
+     *
+     * @param settings the settings to set, not {@code null}
+     */
+    public void setSettings(Map<String, String> settings) {
+        recording.setSettings(settings);
+    };
+
+    /**
      * Enables event.
      *
      * @param eventClass the event to enable, not {@code null}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Mon Jun 03 16:21:47 2019 +0200
@@ -326,4 +326,14 @@
         return a.toArray(new Runnable[0]);
     }
 
+    abstract public void close();
+
+    public void setReuse(boolean reuse) {
+
+    }
+
+    public void setOrdered(boolean ordered) {
+
+    }
+
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Mon Jun 03 16:21:47 2019 +0200
@@ -30,6 +30,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Path;
 
 public final class RecordingInput implements DataInput, AutoCloseable {
 
@@ -57,6 +58,11 @@
         public byte get(long position) {
             return bytes[(int) (position - blockPosition)];
         }
+
+        public void reset() {
+           blockPosition = 0;
+           size = 0;
+        }
     }
 
     private final RandomAccessFile file;
@@ -79,6 +85,7 @@
     public RecordingInput(File f) throws IOException {
         this(f, DEFAULT_BLOCK_SIZE);
     }
+
     public void positionPhysical(long position) throws IOException {
         file.seek(position);
     }
@@ -326,4 +333,21 @@
         return filename;
     }
 
+    // Purpose of this method is to reuse block cache from a
+    // previous RecordingInput
+    public RecordingInput newFile(Path path) throws IOException  {
+        try {
+            close();
+        } catch (IOException e) {
+            // perhaps deleted
+        }
+        RecordingInput input = new RecordingInput(path.toFile(), this.blockSize);
+        input.currentBlock = this.currentBlock;
+        input.currentBlock.reset();
+        input.previousBlock = this.previousBlock;
+        input.previousBlock.reset();
+
+        return input;
+    }
+
 }