--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Thu Oct 17 09:21:00 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Fri Oct 18 17:45:17 2019 +0200
@@ -26,6 +26,7 @@
package jdk.jfr.api.consumer.streaming;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import jdk.jfr.Event;
import jdk.jfr.Name;
@@ -43,50 +44,68 @@
*/
public class TestLatestEvent {
- @Name("Chunk")
- static class ChunkEvent extends Event {
- boolean end;
+ @Name("NotLatest")
+ static class NotLatestEvent extends Event {
+ }
+
+ @Name("Latest")
+ static class LatestEvent extends Event {
+ }
+
+ @Name("MakeChunks")
+ static class MakeChunks extends Event {
}
public static void main(String... args) throws Exception {
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
- // Create chunks with events in the repository
+ MakeChunks e = new MakeChunks();
+ e.commit();
+ CountDownLatch beginChunks = new CountDownLatch(1);
+ r.onEvent("MakeChunks", event-> {
+ beginChunks.countDown();
+ });
+ beginChunks.await();
+ // Create 5 chunks with events in the repository
for (int i = 0; i < 5; i++) {
try (Recording r1 = new Recording()) {
r1.start();
- ChunkEvent e = new ChunkEvent();
- e.end = false;
- e.commit();
+ NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.commit();
r1.stop();
}
}
- CountDownLatch endEventRecevied = new CountDownLatch(1);
- CountDownLatch emitEvent = new CountDownLatch(1);
- try (EventStream s = EventStream.openRepository()) {
- s.onEvent("Chunk", e -> {
- if (e.getBoolean("end")) {
- endEventRecevied.countDown();
- return;
- }
- System.out.println("Stream should start at latest event:");
- System.out.println(e);
- });
+
+ // Create an event in a segment, typically the first.
+ NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.commit();
- ChunkEvent e1 = new ChunkEvent();
- e1.end = false;
- e1.commit();
+ try (EventStream s = EventStream.openRepository()) {
+ awaitFlush(r); // ensure that NotLatest is included
s.startAsync();
- s.onFlush(() -> {
- emitEvent.countDown();
+ AtomicBoolean foundLatest = new AtomicBoolean();
+ // Emit the latest event
+ LatestEvent latest = new LatestEvent();
+ latest.commit();
+ s.onEvent(event -> {
+ String name = event.getEventType().getName();
+ System.out.println("Found event " + name);
+ foundLatest.set(name.equals("Latest"));
+ s.close();
});
- emitEvent.await();
- ChunkEvent e2 = new ChunkEvent();
- e2.end = true;
- e2.commit();
-
- endEventRecevied.await();
+ s.awaitTermination();
+ if (!foundLatest.get()) {
+ throw new Exception("Didn't find latest event!");
+ }
}
}
}
+
+ private static void awaitFlush(RecordingStream r) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ r.onFlush(() -> {
+ latch.countDown();
+ });
+ latch.await();
+ }
}