Add streaming support for repository migration JEP-349-branch
authoregahlin
Thu, 15 Aug 2019 02:58:28 +0200
branchJEP-349-branch
changeset 57754 5693904ecbde
parent 57753 4883a96b6d37
child 57755 8173090d2794
Add streaming support for repository migration
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java
test/jdk/jdk/jfr/api/consumer/streaming/TestRepositoryMigration.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Thu Aug 15 02:55:30 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Thu Aug 15 02:58:28 2019 +0200
@@ -48,8 +48,8 @@
     private final FileAccess fileAccess;
     private final NavigableMap<Long, Path> pathSet = new TreeMap<>();
     private final Map<Path, Long> pathLookup = new HashMap<>();
+    private final Path repository;
     private volatile boolean closed;
-    private Path repository;
 
     public RepositoryFiles(FileAccess fileAccess, Path repository) {
         this.repository = repository;
@@ -129,17 +129,21 @@
     }
 
     private boolean updatePaths() throws IOException {
-        if (repository == null) {
-            SafePath p = Repository.getRepository().getRepositoryPath();
-            if (p == null) {
-                return false;
+        boolean foundNew = false;
+        Path repoPath = repository;
+        if (repoPath == null) {
+            // Always get the latest repository if 'jcmd JFR.configure
+            // repositorypath=...' has been executed
+            SafePath sf = Repository.getRepository().getRepositoryPath();
+            if (sf == null) {
+                return false; // not initialized
             }
-            repository = p.toPath();
+            repoPath = sf.toPath();
         }
-        boolean foundNew = false;
-        List<Path> added = new ArrayList<>();
-        Set<Path> current = new HashSet<>();
-        try (DirectoryStream<Path> dirStream = fileAccess.newDirectoryStream(repository)) {
+
+        try (DirectoryStream<Path> dirStream = fileAccess.newDirectoryStream(repoPath)) {
+            List<Path> added = new ArrayList<>();
+            Set<Path> current = new HashSet<>();
             for (Path p : dirStream) {
                 if (!pathLookup.containsKey(p)) {
                     String s = p.toString();
@@ -150,32 +154,32 @@
                     current.add(p);
                 }
             }
-        }
-        List<Path> removed = new ArrayList<>();
-        for (Path p : pathLookup.keySet()) {
-            if (!current.contains(p)) {
-                removed.add(p);
+            List<Path> removed = new ArrayList<>();
+            for (Path p : pathLookup.keySet()) {
+                if (!current.contains(p)) {
+                    removed.add(p);
+                }
+            }
+
+            for (Path remove : removed) {
+                Long time = pathLookup.get(remove);
+                pathSet.remove(time);
+                pathLookup.remove(remove);
             }
-        }
-
-        for (Path remove : removed) {
-            Long time = pathLookup.get(remove);
-            pathSet.remove(time);
-            pathLookup.remove(remove);
+            Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
+            for (Path p : added) {
+                // Only add files that have a complete header
+                // as the JVM may be in progress writing the file
+                long size = fileAccess.fileSize(p);
+                if (size >= ChunkHeader.HEADER_SIZE) {
+                    long startNanos = readStartTime(p);
+                    pathSet.put(startNanos, p);
+                    pathLookup.put(p, startNanos);
+                    foundNew = true;
+                }
+            }
+            return foundNew;
         }
-        Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
-        for (Path p : added) {
-            // Only add files that have a complete header
-            // as the JVM may be in progress writing the file
-            long size = fileAccess.fileSize(p);
-            if (size >= ChunkHeader.HEADER_SIZE) {
-                long startNanos = readStartTime(p);
-                pathSet.put(startNanos, p);
-                pathLookup.put(p, startNanos);
-                foundNew = true;
-            }
-        }
-        return foundNew;
     }
 
     private long readStartTime(Path p) throws IOException {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestRepositoryMigration.java	Thu Aug 15 02:58:28 2019 +0200
@@ -0,0 +1,76 @@
+package jdk.jfr.api.consumer.streaming;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.CountDownLatch;
+
+import jdk.jfr.Event;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
+import jdk.jfr.jcmd.JcmdHelper;
+
+/**
+ * @test
+ * @summary Verifies that is possible to stream from a repository that is being
+ *          moved.
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib /test/jdk
+ * @run main/othervm jdk.jfr.api.consumer.streaming.TestRepositoryMigration
+ */
+public class TestRepositoryMigration {
+    static class MigrationEvent extends Event {
+        int id;
+    }
+
+    public static void main(String... args) throws Exception {
+        Path newRepository = Paths.get("new-repository");
+        CountDownLatch events = new CountDownLatch(2);
+        try (EventStream es = EventStream.openRepository()) {
+            es.setStartTime(Instant.EPOCH);
+            es.onEvent(e -> {
+                System.out.println(e);
+                if (e.getInt("id") == 1) {
+                    events.countDown();
+                }
+                if (e.getInt("id") == 2) {
+                    events.countDown();
+                }
+            });
+            es.startAsync();
+            try (Recording r = new Recording()) {
+                r.setFlushInterval(Duration.ofSeconds(1));
+                r.start();
+                // Chunk in default repository
+                MigrationEvent e1 = new MigrationEvent();
+                e1.id = 1;
+                e1.commit();
+               JcmdHelper.jcmd("JFR.configure", "repositorypath=" + newRepository.toAbsolutePath());
+                // Chunk in new repository
+                MigrationEvent e2 = new MigrationEvent();
+                e2.id = 2;
+                e2.commit();
+                r.stop();
+                events.await();
+                // Verify that it happened in new repository
+                if (!Files.exists(newRepository)) {
+                    throw new AssertionError("Could not find repository " + newRepository);
+                }
+                System.out.println("Listing contents in new repository:");
+                boolean empty= true;
+                for (Path p: Files.newDirectoryStream(newRepository)) {
+                    System.out.println(p.toAbsolutePath());
+                    empty = false;
+                }
+                System.out.println();
+                if (empty) {
+                    throw new AssertionError("Could not find contents in new repository location " + newRepository);
+                }
+            }
+        }
+    }
+
+}