test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java
branchJEP-349-branch
changeset 58687 222f727e9b05
parent 57690 9316d02dd4a5
child 58724 3d0a172353fc
--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java	Thu Oct 17 09:21:00 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java	Fri Oct 18 17:45:17 2019 +0200
@@ -26,6 +26,7 @@
 package jdk.jfr.api.consumer.streaming;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import jdk.jfr.Event;
 import jdk.jfr.Name;
@@ -43,50 +44,68 @@
  */
 public class TestLatestEvent {
 
-    @Name("Chunk")
-    static class ChunkEvent extends Event {
-        boolean end;
+    @Name("NotLatest")
+    static class NotLatestEvent extends Event {
+    }
+
+    @Name("Latest")
+    static class LatestEvent extends Event {
+    }
+
+    @Name("MakeChunks")
+    static class MakeChunks extends Event {
     }
 
     public static void main(String... args) throws Exception {
         try (RecordingStream r = new RecordingStream()) {
             r.startAsync();
-            // Create chunks with events in the repository
+            MakeChunks e = new MakeChunks();
+            e.commit();
+            CountDownLatch beginChunks = new CountDownLatch(1);
+            r.onEvent("MakeChunks", event-> {
+                beginChunks.countDown();
+            });
+            beginChunks.await();
+            // Create 5 chunks with events in the repository
             for (int i = 0; i < 5; i++) {
                 try (Recording r1 = new Recording()) {
                     r1.start();
-                    ChunkEvent e = new ChunkEvent();
-                    e.end = false;
-                    e.commit();
+                    NotLatestEvent notLatest = new NotLatestEvent();
+                    notLatest.commit();
                     r1.stop();
                 }
             }
-            CountDownLatch endEventRecevied = new CountDownLatch(1);
-            CountDownLatch emitEvent = new CountDownLatch(1);
-            try (EventStream s = EventStream.openRepository()) {
-                s.onEvent("Chunk", e -> {
-                    if (e.getBoolean("end")) {
-                        endEventRecevied.countDown();
-                        return;
-                    }
-                    System.out.println("Stream should start at latest event:");
-                    System.out.println(e);
-                });
+
+            // Create an event in a segment, typically the first.
+            NotLatestEvent notLatest = new NotLatestEvent();
+            notLatest.commit();
 
-                ChunkEvent e1 = new ChunkEvent();
-                e1.end = false;
-                e1.commit();
+            try (EventStream s = EventStream.openRepository()) {
+                awaitFlush(r); // ensure that NotLatest is included
                 s.startAsync();
-                s.onFlush(() -> {
-                    emitEvent.countDown();
+                AtomicBoolean foundLatest = new AtomicBoolean();
+                // Emit the latest event
+                LatestEvent latest = new LatestEvent();
+                latest.commit();
+                s.onEvent(event -> {
+                    String name = event.getEventType().getName();
+                    System.out.println("Found event " + name);
+                    foundLatest.set(name.equals("Latest"));
+                    s.close();
                 });
-                emitEvent.await();
-                ChunkEvent e2 = new ChunkEvent();
-                e2.end = true;
-                e2.commit();
-
-                endEventRecevied.await();
+                s.awaitTermination();
+                if (!foundLatest.get()) {
+                    throw new Exception("Didn't find latest event!");
+                }
             }
         }
     }
+
+    private static void awaitFlush(RecordingStream r) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        r.onFlush(() -> {
+            latch.countDown();
+        });
+        latch.await();
+    }
 }