--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Jun 03 16:21:47 2019 +0200
@@ -26,24 +26,268 @@
package jdk.jfr.consumer;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.security.AccessControlContext;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.function.Consumer;
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.Repository;
+import jdk.jfr.internal.consumer.ChunkHeader;
import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
final class EventDirectoryStream implements EventStream {
- private static class EventSetConsumer extends EventConsumer {
+ private static final class RepositoryFiles {
+ private final Path repostory;
+ private final SortedMap<Long, Path> pathSet = new TreeMap<>();
+ private final Map<Path, Long> pathLookup = new HashMap<>();
+ private volatile boolean closed;
+
+ public RepositoryFiles(Path repostory) {
+ this.repostory = repostory;
+ }
+
+ Path nextPath(Path previous) {
+ long startTimeNanos = 0L;
+ if (previous != null) {
+ startTimeNanos = pathLookup.get(previous);
+ }
+ SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+ if (!after.isEmpty()) {
+ Path path = after.get(after.firstKey());
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+ return path;
+ }
+ while (!closed) {
+ try {
+ if (updatePaths()) {
+ continue;
+ }
+ } catch (IOException e) {
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
+ // This can happen if a chunk is being removed
+ // between the file was discovered and an instance
+ // of an EventSet was constructed. Just ignore,
+ // and retry later.
+ }
+ try {
+ pathSet.wait(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return null;
+ }
+
+ private boolean updatePaths() throws IOException {
+ boolean foundNew = false;
+ List<Path> added = new ArrayList<>();
+ Set<Path> current = new HashSet<>();
+ if (!Files.exists(repostory)) {
+ // Repository removed, probably due to shutdown
+ return true;
+ }
+ try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repostory, "*.jfr")) {
+ for (Path p : dirStream) {
+ if (!pathLookup.containsKey(p)) {
+ added.add(p);
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+ }
+ current.add(p);
+ }
+ }
+ List<Path> removed = new ArrayList<>();
+ for (Path p : pathLookup.keySet()) {
+ if (!current.contains(p)) {
+ removed.add(p);
+ }
+ }
+
+ for (Path remove : removed) {
+ Long time = pathLookup.get(remove);
+ pathSet.remove(time);
+ pathLookup.remove(remove);
+ }
+ Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
+ for (Path p : added) {
+ // Only add files that have a complete header
+ // as the JVM may be in progress writing the file
+ long size = Files.size(p);
+ if (size >= ChunkHeader.HEADER_SIZE) {
+ long startNanos = readStartTime(p);
+ pathSet.put(startNanos, p);
+ pathLookup.put(p, startNanos);
+ foundNew = true;
+ }
+ }
+ return foundNew;
+ }
+
+ private long readStartTime(Path p) throws IOException {
+ try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+ ChunkHeader c = new ChunkHeader(in);
+ return c.getStartNanos();
+ }
+ }
+
+ public void close() {
+ this.closed = true;
+ synchronized (pathSet) {
+ pathSet.notify();
+ }
+ }
+ }
+
+ static final class ParserConsumer extends EventConsumer {
+
+ private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
+ private static final int DEFAULT_ARRAY_SIZE = 10_000;
+ private final RepositoryFiles repositoryFiles;
+ private ChunkParser chunkParser;
+ private boolean reuse = true;
+ private RecordedEvent[] sortedList;
+ private boolean ordered;
+
+ public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
+ super(acc);
+ repositoryFiles = new RepositoryFiles(p);
+ }
+
+ @Override
+ public void process() throws IOException {
+ Path path = repositoryFiles.nextPath(null);
+ try (RecordingInput input = new RecordingInput(path.toFile())) {
+ chunkParser = new ChunkParser(input, reuse);
+ while (!isClosed()) {
+ boolean reuse = this.reuse;
+ boolean ordered = this.ordered;
+ chunkParser.setReuse(reuse);
+ chunkParser.setOrdered(ordered);
+ chunkParser.resetEventCache();
+ chunkParser.updateEventParsers();
+
+ if (ordered) {
+ processOrdered2();
+ } else {
+ processUnordered();
+ }
+ if (chunkParser.isLastChunk()) {
+ return;
+ }
+ path = repositoryFiles.nextPath(path);
+ input.newFile(path);
+ }
+ }
+ }
+
+ private void processOrdered2() throws IOException {
+ if (sortedList == null) {
+ sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
+ }
+ while (true) {
+ boolean reuse = this.reuse;
+ boolean ordered = this.ordered;
+ chunkParser.setReuse(reuse);
+ chunkParser.setOrdered(ordered);
+ chunkParser.resetEventCache();
+ chunkParser.updateEventParsers();
+ boolean awaitNewEvents = false;
+ int index = 0;
+ while (true) {
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ // wait for new event with next call to
+ // readStreamingEvent()
+ awaitNewEvents = true;
+ break;
+ }
+ awaitNewEvents = false;
+ if (index == sortedList.length) {
+ sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+ }
+ sortedList[index++] = e;
+ }
+
+ // no events found
+ if (index == 0 && chunkParser.isChunkFinished()) {
+ return;
+ }
+ // at least 2 events, sort them
+ if (index > 1) {
+ Arrays.sort(sortedList, 0, index, END_TIME);
+ }
+ for (int i = 0; i < index; i++) {
+ dispatch(sortedList[i]);
+ }
+ if (chunkParser.isChunkFinished()) {
+ return;
+ }
+ runFlushActions();
+ }
+ }
+
+ private void processUnordered() throws IOException {
+ boolean awaitNewEvents = false;
+ while (true) {
+ boolean reuse = this.reuse;
+ boolean ordered = this.ordered;
+ chunkParser.setReuse(reuse);
+ chunkParser.setOrdered(ordered);
+ chunkParser.resetEventCache();
+ chunkParser.updateEventParsers();
+ while (true) {
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ awaitNewEvents = true;
+ break;
+ }
+ dispatch(e);
+ }
+ runFlushActions();
+ }
+ }
+
+ public void setReuse(boolean reuse) {
+ this.reuse = reuse;
+ }
+
+ public void setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ }
+
+ @Override
+ public void close() {
+ repositoryFiles.close();
+ }
+ }
+
+ static final class SharedParserConsumer extends EventConsumer {
private EventSetLocation location;
private EventSet eventSet;
private int eventSetIndex;
private int eventArrayIndex;
private RecordedEvent[] currentEventArray = new RecordedEvent[0];
- public EventSetConsumer(AccessControlContext acc) throws IOException {
+ public SharedParserConsumer(AccessControlContext acc) throws IOException {
super(acc);
}
@@ -108,10 +352,11 @@
}
}
- private final EventSetConsumer eventConsumer;
+ private final EventConsumer eventConsumer;
public EventDirectoryStream(AccessControlContext acc) throws IOException {
- eventConsumer = new EventSetConsumer(acc);
+ // Path p = Repository.getRepository().getRepositoryPath().toPath();
+ eventConsumer = new SharedParserConsumer(acc);
}
public void close() {
@@ -184,8 +429,6 @@
@Override
public void setOrdered(boolean ordered) {
- if (ordered == false) {
- throw new UnsupportedOperationException("Unordered not implemented yet");
- }
+ eventConsumer.setOrdered(ordered);
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Jun 03 16:21:47 2019 +0200
@@ -125,6 +125,11 @@
public void setOrdered(boolean ordered) {
this.ordered = ordered;
}
+
+ @Override
+ public void close() {
+
+ }
}
private final RecordingInput input;
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon Jun 03 16:21:47 2019 +0200
@@ -163,7 +163,6 @@
*/
public void setOrdered(boolean ordered);
-
/**
* Starts processing events in the stream.
* <p>
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon Jun 03 16:21:47 2019 +0200
@@ -29,6 +29,7 @@
import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
+import java.util.Map;
import java.util.function.Consumer;
import jdk.jfr.Configuration;
@@ -138,6 +139,37 @@
}
/**
+ * Replaces all settings for this recording stream
+ * <p>
+ * The following example shows how to set event settings for a recording.
+ *
+ * <pre>
+ * <code>
+ * Map{@literal <}String, String{@literal >} settings = new HashMap{@literal <}{@literal >}();
+ * settings.putAll(EventSettings.enabled("jdk.CPUSample").withPeriod(Duration.ofSeconds(2)).toMap());
+ * settings.putAll(EventSettings.enabled(MyEvent.class).withThreshold(Duration.ofSeconds(2)).withoutStackTrace().toMap());
+ * settings.put("jdk.ExecutionSample#period", "10 ms");
+ * recordingStream.setSettings(settings);
+ * </code>
+ * </pre>
+ *
+ * The following example shows how to merge settings.
+ *
+ * <pre>
+ * {@code
+ * Map<String, String> settings = recording.getSettings();
+ * settings.putAll(additionalSettings);
+ * recordingStream.setSettings(settings);
+ * }
+ * </pre>
+ *
+ * @param settings the settings to set, not {@code null}
+ */
+ public void setSettings(Map<String, String> settings) {
+ recording.setSettings(settings);
+ };
+
+ /**
* Enables event.
*
* @param eventClass the event to enable, not {@code null}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Mon Jun 03 16:21:47 2019 +0200
@@ -326,4 +326,14 @@
return a.toArray(new Runnable[0]);
}
+ abstract public void close();
+
+ public void setReuse(boolean reuse) {
+
+ }
+
+ public void setOrdered(boolean ordered) {
+
+ }
+
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Fri May 31 20:44:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Mon Jun 03 16:21:47 2019 +0200
@@ -30,6 +30,7 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.file.Path;
public final class RecordingInput implements DataInput, AutoCloseable {
@@ -57,6 +58,11 @@
public byte get(long position) {
return bytes[(int) (position - blockPosition)];
}
+
+ public void reset() {
+ blockPosition = 0;
+ size = 0;
+ }
}
private final RandomAccessFile file;
@@ -79,6 +85,7 @@
public RecordingInput(File f) throws IOException {
this(f, DEFAULT_BLOCK_SIZE);
}
+
public void positionPhysical(long position) throws IOException {
file.seek(position);
}
@@ -326,4 +333,21 @@
return filename;
}
+ // Purpose of this method is to reuse block cache from a
+ // previous RecordingInput
+ public RecordingInput newFile(Path path) throws IOException {
+ try {
+ close();
+ } catch (IOException e) {
+ // perhaps deleted
+ }
+ RecordingInput input = new RecordingInput(path.toFile(), this.blockSize);
+ input.currentBlock = this.currentBlock;
+ input.currentBlock.reset();
+ input.previousBlock = this.previousBlock;
+ input.previousBlock.reset();
+
+ return input;
+ }
+
}