src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57433 83e4343a6984
parent 57432 ba454a26d2c1
child 57449 099789ceff7d
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Jun 27 10:19:32 2019 +0200
@@ -26,33 +26,18 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.function.Consumer;
 
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.Repository;
-import jdk.jfr.internal.consumer.ChunkHeader;
 import jdk.jfr.internal.consumer.EventConsumer;
 import jdk.jfr.internal.consumer.RecordingInput;
+import jdk.jfr.internal.consumer.RepositoryFiles;
 
 /**
  * Implementation of an {@code EventStream}} that operates against a directory
@@ -61,112 +46,6 @@
  */
 final class EventDirectoryStream implements EventStream {
 
-    private static final class RepositoryFiles {
-        private final Path repostory;
-        private final SortedMap<Long, Path> pathSet = new TreeMap<>();
-        private final Map<Path, Long> pathLookup = new HashMap<>();
-        private volatile boolean closed;
-
-        public RepositoryFiles(Path repostory) {
-            this.repostory = repostory;
-        }
-
-        long getTimestamp(Path p) {
-            return pathLookup.get(p);
-        }
-
-        Path nextPath(long startTimeNanos) {
-            while (!closed) {
-                SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
-                if (!after.isEmpty()) {
-                    Path path = after.get(after.firstKey());
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
-                    return path;
-                }
-                try {
-                    if (updatePaths(repostory)) {
-                        continue;
-                    }
-                } catch (IOException e) {
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
-                    // This can happen if a chunk is being removed
-                    // between the file was discovered and an instance
-                    // of an EventSet was constructed. Just ignore,
-                    // and retry later.
-                }
-                try {
-                    synchronized (pathSet) {
-                        pathSet.wait(1000);
-                    }
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            return null;
-        }
-
-        private boolean updatePaths(Path repo) throws IOException {
-            if (repo == null) {
-                repo = Repository.getRepository().getRepositoryPath().toPath();
-            }
-            boolean foundNew = false;
-            List<Path> added = new ArrayList<>();
-            Set<Path> current = new HashSet<>();
-            if (!Files.exists(repo)) {
-                // Repository removed, probably due to shutdown
-                return true;
-            }
-            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
-                for (Path p : dirStream) {
-                    if (!pathLookup.containsKey(p)) {
-                        added.add(p);
-                        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
-                    }
-                    current.add(p);
-                }
-            }
-            List<Path> removed = new ArrayList<>();
-            for (Path p : pathLookup.keySet()) {
-                if (!current.contains(p)) {
-                    removed.add(p);
-                }
-            }
-
-            for (Path remove : removed) {
-                Long time = pathLookup.get(remove);
-                pathSet.remove(time);
-                pathLookup.remove(remove);
-            }
-            Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
-            for (Path p : added) {
-                // Only add files that have a complete header
-                // as the JVM may be in progress writing the file
-                long size = Files.size(p);
-                if (size >= ChunkHeader.HEADER_SIZE) {
-                    long startNanos = readStartTime(p);
-                    pathSet.put(startNanos, p);
-                    pathLookup.put(p, startNanos);
-                    foundNew = true;
-                }
-            }
-            return foundNew;
-        }
-
-        private long readStartTime(Path p) throws IOException {
-            try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
-                ChunkHeader c = new ChunkHeader(in);
-                return c.getStartNanos();
-            }
-        }
-
-        public void close() {
-            synchronized (pathSet) {
-                this.closed = true;
-                pathSet.notify();
-            }
-        }
-    }
-
     static final class ParserConsumer extends EventConsumer {
 
         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
@@ -260,92 +139,12 @@
             return awaitNewEvents;
         }
 
-        public void setReuse(boolean reuse) {
-            this.reuse = reuse;
-        }
-
-        public void setOrdered(boolean ordered) {
-            this.ordered = ordered;
-        }
-
         @Override
         public void close() {
             repositoryFiles.close();
         }
     }
 
-    static final class SharedParserConsumer extends EventConsumer {
-        private EventSetLocation location;
-        private EventSet eventSet;
-        private int eventSetIndex;
-        private int eventArrayIndex;
-        private RecordedEvent[] currentEventArray = new RecordedEvent[0];
-
-        public SharedParserConsumer(AccessControlContext acc) throws IOException {
-            super(acc);
-        }
-
-        public void process() throws IOException {
-            this.location = EventSetLocation.current();
-            this.eventSet = location.acquire(startNanos, null); // use timestamp
-                                                                // from
-            if (eventSet == null) {
-                return;
-            }
-            while (!isClosed()) {
-                processSegment();
-                runFlushActions();
-                do {
-                    if (isClosed()) {
-                        return;
-                    }
-                    currentEventArray = eventSet.readEvents(eventSetIndex);
-                    if (currentEventArray == EventSet.END_OF_SET) {
-                        eventSet = eventSet.next(eventFilter);
-                        if (eventSet == null || isClosed()) {
-                            return;
-                        }
-                        eventSetIndex = 0;
-                        continue;
-                    }
-                    if (currentEventArray == null) {
-                        return; // no more events
-                    }
-                    eventSetIndex++;
-                } while (currentEventArray.length == 0);
-                eventArrayIndex = 0;
-            }
-        }
-
-        private void processSegment() {
-            while (eventArrayIndex < currentEventArray.length) {
-                RecordedEvent e = currentEventArray[eventArrayIndex++];
-                if (e == null) {
-                    return;
-                }
-                dispatch(e);
-            }
-        }
-
-        public void close() {
-            setClosed(true);
-            // TODO: Data races here, must fix
-            synchronized (this) {
-                if (eventSet != null) {
-                    eventSet.release(null);
-                }
-                if (location != null) {
-                    location.release();
-                }
-            }
-            runCloseActions();
-        }
-
-        public void setReuse(boolean reuse) {
-            // ignore hint
-        }
-    }
-
     private final EventConsumer eventConsumer;
 
     public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {