Stabilize TestLatestEvent.java JEP-349-branch
authoregahlin
Sat, 26 Oct 2019 23:59:51 +0200
branchJEP-349-branch
changeset 58806 a7d850b47b19
parent 58774 141412e96b12
child 58820 8412a437a4bc
child 58823 6a21dba79b81
Stabilize TestLatestEvent.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java
test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Sat Oct 26 23:59:51 2019 +0200
@@ -258,7 +258,7 @@
                     }
                 } else {
                     if (typeId != 0) { // Not metadata event
-                        Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Unknwon event type " + typeId);
+                        Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Unknown event type " + typeId);
                     }
                 }
             }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Sat Oct 26 23:59:51 2019 +0200
@@ -144,6 +144,11 @@
                     return;
                 }
                 long durationNanos = currentParser.getChunkDuration();
+                if (durationNanos == 0) {
+                    // Avoid reading the same chunk again and again if
+                    // duration is 0 ns
+                    durationNanos++;
+                }
                 path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
                 if (path == null) {
                     return; // stream closed
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Sat Oct 26 23:59:51 2019 +0200
@@ -109,6 +109,24 @@
     }
 
     Path nextPath(long startTimeNanos) {
+        if (closed) {
+            return null;
+        }
+        // Try to get the 'exact' path first
+        // to avoid skipping files if repository
+        // is updated while DirectoryStream
+        // is traversing it
+        Path path = pathSet.get(startTimeNanos);
+        if (path != null) {
+            return path;
+        }
+        // Update paths
+        try {
+            updatePaths();
+        } catch (IOException e) {
+            // ignore
+        }
+        // try to get the next file
         return path(startTimeNanos);
     }
 
--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java	Thu Oct 24 07:02:36 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java	Sat Oct 26 23:59:51 2019 +0200
@@ -25,10 +25,14 @@
 
 package jdk.jfr.api.consumer.streaming;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import jdk.jfr.Event;
+import jdk.jfr.FlightRecorder;
 import jdk.jfr.Name;
 import jdk.jfr.Recording;
 import jdk.jfr.consumer.EventStream;
@@ -36,7 +40,7 @@
 
 /**
  * @test
- * @summary Verifies that a stream from a repository starts at the latest event
+ * @summary Verifies that EventStream::openRepository() read from the latest flush
  * @key jfr
  * @requires vm.hasJFR
  * @library /test/lib
@@ -46,6 +50,8 @@
 
     @Name("NotLatest")
     static class NotLatestEvent extends Event {
+
+        public int id;
     }
 
     @Name("Latest")
@@ -57,15 +63,23 @@
     }
 
     public static void main(String... args) throws Exception {
+        CountDownLatch notLatestEvent = new CountDownLatch(6);
+        CountDownLatch beginChunks = new CountDownLatch(1);
+
         try (RecordingStream r = new RecordingStream()) {
+            r.onEvent("MakeChunks", event-> {
+                System.out.println(event);
+                beginChunks.countDown();
+            });
+            r.onEvent("NotLatest", event -> {
+                System.out.println(event);
+                notLatestEvent.countDown();
+            });
             r.startAsync();
             MakeChunks e = new MakeChunks();
             e.commit();
-            CountDownLatch beginChunks = new CountDownLatch(1);
-            r.onEvent("MakeChunks", event-> {
-                beginChunks.countDown();
-            });
-            System.out.println("Waitning for first chunk");
+
+            System.out.println("Waiting for first chunk");
             beginChunks.await();
             // Create 5 chunks with events in the repository
             for (int i = 0; i < 5; i++) {
@@ -73,52 +87,49 @@
                 try (Recording r1 = new Recording()) {
                     r1.start();
                     NotLatestEvent notLatest = new NotLatestEvent();
+                    notLatest.id = i;
                     notLatest.commit();
                     r1.stop();
                 }
             }
             System.out.println("All empty chunks created");
-            // Create an event in a segment, typically the first.
+
+            // Create event in new chunk
             NotLatestEvent notLatest = new NotLatestEvent();
+            notLatest.id = 5;
             notLatest.commit();
+
+            // This latch ensures thatNotLatest has been
+            // flushed and a new valid position has been written
+            // to the chunk header
+            notLatestEvent.await(80, TimeUnit.SECONDS);
+            if (notLatestEvent.getCount() != 0) {
+               Recording rec =  FlightRecorder.getFlightRecorder().takeSnapshot();
+               Path p = Paths.get("c:\\testlatest\\not-latest.jfr");
+               rec.dump(p);
+               System.out.println("dumped " + p);
+               rec.close();
+            }
+
             try (EventStream s = EventStream.openRepository()) {
-                // Wait for next segment
-                // to prevent flush to prevent NotLatest to be included
-                awaitFlush(r);
                 System.out.println("EventStream opened");
                 AtomicBoolean foundLatest = new AtomicBoolean();
                 s.onEvent(event -> {
                     String name = event.getEventType().getName();
                     System.out.println("Found event " + name);
                     foundLatest.set(name.equals("Latest"));
-                    s.close();
                 });
-                System.out.println("Added onEvent handler");
                 s.startAsync();
-                // wait for next segment
-                awaitFlush(s);
-                // Emit the latest event
-                LatestEvent latest = new LatestEvent();
-                latest.commit();
-                System.out.println("Latest event emitted");
-                System.out.println("Waiting for termination");
-                s.awaitTermination();
-                if (!foundLatest.get()) {
-                    throw new Exception("Didn't find latest event!");
+                // Must loop here as there is no guarantee
+                // that the parser thread starts before event
+                // is flushed
+                while (!foundLatest.get()) {
+                    LatestEvent latest = new LatestEvent();
+                    latest.commit();
+                    System.out.println("Latest event emitted. Waiting 1 s ...");
+                    Thread.sleep(1000);
                 }
             }
         }
     }
-
-    private static void awaitFlush(EventStream stream) throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(1);
-        System.out.println("Waiting for flush...");
-        final Runnable l =  () -> {
-            System.out.println("Flush arrived!");
-            latch.countDown();
-        };
-        stream.onFlush(l);
-        latch.await();
-        stream.remove(l);
-    }
 }