--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Jun 27 10:19:32 2019 +0200
@@ -26,33 +26,18 @@
package jdk.jfr.consumer;
import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessControlContext;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.function.Consumer;
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.Repository;
-import jdk.jfr.internal.consumer.ChunkHeader;
import jdk.jfr.internal.consumer.EventConsumer;
import jdk.jfr.internal.consumer.RecordingInput;
+import jdk.jfr.internal.consumer.RepositoryFiles;
/**
* Implementation of an {@code EventStream}} that operates against a directory
@@ -61,112 +46,6 @@
*/
final class EventDirectoryStream implements EventStream {
- private static final class RepositoryFiles {
- private final Path repostory;
- private final SortedMap<Long, Path> pathSet = new TreeMap<>();
- private final Map<Path, Long> pathLookup = new HashMap<>();
- private volatile boolean closed;
-
- public RepositoryFiles(Path repostory) {
- this.repostory = repostory;
- }
-
- long getTimestamp(Path p) {
- return pathLookup.get(p);
- }
-
- Path nextPath(long startTimeNanos) {
- while (!closed) {
- SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
- if (!after.isEmpty()) {
- Path path = after.get(after.firstKey());
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
- return path;
- }
- try {
- if (updatePaths(repostory)) {
- continue;
- }
- } catch (IOException e) {
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
- // This can happen if a chunk is being removed
- // between the file was discovered and an instance
- // of an EventSet was constructed. Just ignore,
- // and retry later.
- }
- try {
- synchronized (pathSet) {
- pathSet.wait(1000);
- }
- } catch (InterruptedException e) {
- // ignore
- }
- }
- return null;
- }
-
- private boolean updatePaths(Path repo) throws IOException {
- if (repo == null) {
- repo = Repository.getRepository().getRepositoryPath().toPath();
- }
- boolean foundNew = false;
- List<Path> added = new ArrayList<>();
- Set<Path> current = new HashSet<>();
- if (!Files.exists(repo)) {
- // Repository removed, probably due to shutdown
- return true;
- }
- try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
- for (Path p : dirStream) {
- if (!pathLookup.containsKey(p)) {
- added.add(p);
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
- }
- current.add(p);
- }
- }
- List<Path> removed = new ArrayList<>();
- for (Path p : pathLookup.keySet()) {
- if (!current.contains(p)) {
- removed.add(p);
- }
- }
-
- for (Path remove : removed) {
- Long time = pathLookup.get(remove);
- pathSet.remove(time);
- pathLookup.remove(remove);
- }
- Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
- for (Path p : added) {
- // Only add files that have a complete header
- // as the JVM may be in progress writing the file
- long size = Files.size(p);
- if (size >= ChunkHeader.HEADER_SIZE) {
- long startNanos = readStartTime(p);
- pathSet.put(startNanos, p);
- pathLookup.put(p, startNanos);
- foundNew = true;
- }
- }
- return foundNew;
- }
-
- private long readStartTime(Path p) throws IOException {
- try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
- ChunkHeader c = new ChunkHeader(in);
- return c.getStartNanos();
- }
- }
-
- public void close() {
- synchronized (pathSet) {
- this.closed = true;
- pathSet.notify();
- }
- }
- }
-
static final class ParserConsumer extends EventConsumer {
private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
@@ -260,92 +139,12 @@
return awaitNewEvents;
}
- public void setReuse(boolean reuse) {
- this.reuse = reuse;
- }
-
- public void setOrdered(boolean ordered) {
- this.ordered = ordered;
- }
-
@Override
public void close() {
repositoryFiles.close();
}
}
- static final class SharedParserConsumer extends EventConsumer {
- private EventSetLocation location;
- private EventSet eventSet;
- private int eventSetIndex;
- private int eventArrayIndex;
- private RecordedEvent[] currentEventArray = new RecordedEvent[0];
-
- public SharedParserConsumer(AccessControlContext acc) throws IOException {
- super(acc);
- }
-
- public void process() throws IOException {
- this.location = EventSetLocation.current();
- this.eventSet = location.acquire(startNanos, null); // use timestamp
- // from
- if (eventSet == null) {
- return;
- }
- while (!isClosed()) {
- processSegment();
- runFlushActions();
- do {
- if (isClosed()) {
- return;
- }
- currentEventArray = eventSet.readEvents(eventSetIndex);
- if (currentEventArray == EventSet.END_OF_SET) {
- eventSet = eventSet.next(eventFilter);
- if (eventSet == null || isClosed()) {
- return;
- }
- eventSetIndex = 0;
- continue;
- }
- if (currentEventArray == null) {
- return; // no more events
- }
- eventSetIndex++;
- } while (currentEventArray.length == 0);
- eventArrayIndex = 0;
- }
- }
-
- private void processSegment() {
- while (eventArrayIndex < currentEventArray.length) {
- RecordedEvent e = currentEventArray[eventArrayIndex++];
- if (e == null) {
- return;
- }
- dispatch(e);
- }
- }
-
- public void close() {
- setClosed(true);
- // TODO: Data races here, must fix
- synchronized (this) {
- if (eventSet != null) {
- eventSet.release(null);
- }
- if (location != null) {
- location.release();
- }
- }
- runCloseActions();
- }
-
- public void setReuse(boolean reuse) {
- // ignore hint
- }
- }
-
private final EventConsumer eventConsumer;
public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {