41 * @library /test/lib |
42 * @library /test/lib |
42 * @run main/othervm jdk.jfr.api.consumer.streaming.TestLatestEvent |
43 * @run main/othervm jdk.jfr.api.consumer.streaming.TestLatestEvent |
43 */ |
44 */ |
44 public class TestLatestEvent { |
45 public class TestLatestEvent { |
45 |
46 |
46 @Name("Chunk") |
47 @Name("NotLatest") |
47 static class ChunkEvent extends Event { |
48 static class NotLatestEvent extends Event { |
48 boolean end; |
49 } |
|
50 |
|
51 @Name("Latest") |
|
52 static class LatestEvent extends Event { |
|
53 } |
|
54 |
|
55 @Name("MakeChunks") |
|
56 static class MakeChunks extends Event { |
49 } |
57 } |
50 |
58 |
51 public static void main(String... args) throws Exception { |
59 public static void main(String... args) throws Exception { |
52 try (RecordingStream r = new RecordingStream()) { |
60 try (RecordingStream r = new RecordingStream()) { |
53 r.startAsync(); |
61 r.startAsync(); |
54 // Create chunks with events in the repository |
62 MakeChunks e = new MakeChunks(); |
|
63 e.commit(); |
|
64 CountDownLatch beginChunks = new CountDownLatch(1); |
|
65 r.onEvent("MakeChunks", event-> { |
|
66 beginChunks.countDown(); |
|
67 }); |
|
68 beginChunks.await(); |
|
69 // Create 5 chunks with events in the repository |
55 for (int i = 0; i < 5; i++) { |
70 for (int i = 0; i < 5; i++) { |
56 try (Recording r1 = new Recording()) { |
71 try (Recording r1 = new Recording()) { |
57 r1.start(); |
72 r1.start(); |
58 ChunkEvent e = new ChunkEvent(); |
73 NotLatestEvent notLatest = new NotLatestEvent(); |
59 e.end = false; |
74 notLatest.commit(); |
60 e.commit(); |
|
61 r1.stop(); |
75 r1.stop(); |
62 } |
76 } |
63 } |
77 } |
64 CountDownLatch endEventRecevied = new CountDownLatch(1); |
78 |
65 CountDownLatch emitEvent = new CountDownLatch(1); |
79 // Create an event in a segment, typically the first. |
|
80 NotLatestEvent notLatest = new NotLatestEvent(); |
|
81 notLatest.commit(); |
|
82 |
66 try (EventStream s = EventStream.openRepository()) { |
83 try (EventStream s = EventStream.openRepository()) { |
67 s.onEvent("Chunk", e -> { |
84 awaitFlush(r); // ensure that NotLatest is included |
68 if (e.getBoolean("end")) { |
85 s.startAsync(); |
69 endEventRecevied.countDown(); |
86 AtomicBoolean foundLatest = new AtomicBoolean(); |
70 return; |
87 // Emit the latest event |
71 } |
88 LatestEvent latest = new LatestEvent(); |
72 System.out.println("Stream should start at latest event:"); |
89 latest.commit(); |
73 System.out.println(e); |
90 s.onEvent(event -> { |
|
91 String name = event.getEventType().getName(); |
|
92 System.out.println("Found event " + name); |
|
93 foundLatest.set(name.equals("Latest")); |
|
94 s.close(); |
74 }); |
95 }); |
75 |
96 s.awaitTermination(); |
76 ChunkEvent e1 = new ChunkEvent(); |
97 if (!foundLatest.get()) { |
77 e1.end = false; |
98 throw new Exception("Didn't find latest event!"); |
78 e1.commit(); |
99 } |
79 s.startAsync(); |
|
80 s.onFlush(() -> { |
|
81 emitEvent.countDown(); |
|
82 }); |
|
83 emitEvent.await(); |
|
84 ChunkEvent e2 = new ChunkEvent(); |
|
85 e2.end = true; |
|
86 e2.commit(); |
|
87 |
|
88 endEventRecevied.await(); |
|
89 } |
100 } |
90 } |
101 } |
91 } |
102 } |
|
103 |
|
104 private static void awaitFlush(RecordingStream r) throws InterruptedException { |
|
105 CountDownLatch latch = new CountDownLatch(1); |
|
106 r.onFlush(() -> { |
|
107 latch.countDown(); |
|
108 }); |
|
109 latch.await(); |
|
110 } |
92 } |
111 } |