--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestEmptyChunks.java Wed Oct 23 23:47:56 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestEmptyChunks.java Thu Oct 24 07:02:36 2019 +0200
@@ -45,11 +45,19 @@
public static void main(String... args) throws Exception {
CountDownLatch end = new CountDownLatch(1);
+ CountDownLatch firstFlush = new CountDownLatch(1);
try (RecordingStream es = new RecordingStream()) {
es.onEvent(EndEvent.class.getName(), e -> {
end.countDown();
});
+ es.onFlush(() -> {
+ firstFlush.countDown();
+ });
es.startAsync();
+ System.out.println("Invoked startAsync()");
+ // Wait for stream thread to start
+ firstFlush.await();
+ System.out.println("Stream thread active");
Recording r1 = new Recording();
r1.start();
System.out.println("Chunk 1 started");
--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Wed Oct 23 23:47:56 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Thu Oct 24 07:02:36 2019 +0200
@@ -82,17 +82,21 @@
NotLatestEvent notLatest = new NotLatestEvent();
notLatest.commit();
try (EventStream s = EventStream.openRepository()) {
+ // Wait for next segment
+ // to prevent flush to prevent NotLatest to be included
+ awaitFlush(r);
System.out.println("EventStream opened");
- awaitFlush(r); // ensure that NotLatest is included
- s.startAsync();
AtomicBoolean foundLatest = new AtomicBoolean();
- System.out.println("Added onEvent handler");
s.onEvent(event -> {
String name = event.getEventType().getName();
System.out.println("Found event " + name);
foundLatest.set(name.equals("Latest"));
s.close();
});
+ System.out.println("Added onEvent handler");
+ s.startAsync();
+ // wait for next segment
+ awaitFlush(s);
// Emit the latest event
LatestEvent latest = new LatestEvent();
latest.commit();
@@ -106,14 +110,15 @@
}
}
- private static void awaitFlush(RecordingStream r) throws InterruptedException {
+ private static void awaitFlush(EventStream stream) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
System.out.println("Waiting for flush...");
- r.onFlush(() -> {
+ final Runnable l = () -> {
System.out.println("Flush arrived!");
latch.countDown();
- });
+ };
+ stream.onFlush(l);
latch.await();
-
+ stream.remove(l);
}
}