--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetEndTime.java Thu Oct 17 09:21:00 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetEndTime.java Fri Oct 18 17:45:17 2019 +0200
@@ -25,19 +25,25 @@
package jdk.jfr.api.consumer.recordingstream;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import jdk.jfr.Event;
import jdk.jfr.Name;
import jdk.jfr.Recording;
import jdk.jfr.StackTrace;
+import jdk.jfr.Timestamp;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
+import jdk.jfr.consumer.RecordingStream;
+import jdk.test.lib.jfr.Events;
/**
* @test
@@ -56,17 +62,48 @@
}
public static void main(String... args) throws Exception {
+ testEventStream();
+ testRecordingStream();
+ }
+
+ @StackTrace(false)
+ public final static class Now extends Event {
+ @Timestamp(Timestamp.MILLISECONDS_SINCE_EPOCH)
+ public long timestamp;
+ }
+
+ private static void testRecordingStream() throws Exception {
+ try (RecordingStream rs = new RecordingStream()) {
+ AtomicBoolean fail = new AtomicBoolean();
+ Instant endTime = Instant.now().plus(Duration.ofSeconds(1));
+ rs.setReuse(false);
+ rs.onEvent(e -> {
+ if (e.getEndTime().isAfter(endTime)) {
+ fail.set(true);
+ }
+ });
+ rs.setEndTime(endTime);
+ rs.startAsync();
+ for (int i = 0; i < 100; i++) {
+ Now m = new Now();
+ m.commit();
+ Thread.sleep(10);
+ }
+ }
+ }
+
+ private static void testEventStream() throws InterruptedException, IOException, Exception {
try (Recording r = new Recording()) {
r.setFlushInterval(Duration.ofSeconds(1));
r.start();
Mark event1 = new Mark();
- event1.begin(); // start time
+ event1.begin(); // start time
event1.before = true;
advanceClock();
event1.commit();
Mark event2 = new Mark();
- event2.begin(); // end time
+ event2.begin(); // end time
advanceClock();
Thread.sleep(100);
event2.before = false;
--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Thu Oct 17 09:21:00 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Fri Oct 18 17:45:17 2019 +0200
@@ -26,6 +26,7 @@
package jdk.jfr.api.consumer.streaming;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import jdk.jfr.Event;
import jdk.jfr.Name;
@@ -43,50 +44,68 @@
*/
public class TestLatestEvent {
- @Name("Chunk")
- static class ChunkEvent extends Event {
- boolean end;
+ @Name("NotLatest")
+ static class NotLatestEvent extends Event {
+ }
+
+ @Name("Latest")
+ static class LatestEvent extends Event {
+ }
+
+ @Name("MakeChunks")
+ static class MakeChunks extends Event {
}
public static void main(String... args) throws Exception {
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
- // Create chunks with events in the repository
+ MakeChunks e = new MakeChunks();
+ e.commit();
+ CountDownLatch beginChunks = new CountDownLatch(1);
+ r.onEvent("MakeChunks", event-> {
+ beginChunks.countDown();
+ });
+ beginChunks.await();
+ // Create 5 chunks with events in the repository
for (int i = 0; i < 5; i++) {
try (Recording r1 = new Recording()) {
r1.start();
- ChunkEvent e = new ChunkEvent();
- e.end = false;
- e.commit();
+ NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.commit();
r1.stop();
}
}
- CountDownLatch endEventRecevied = new CountDownLatch(1);
- CountDownLatch emitEvent = new CountDownLatch(1);
- try (EventStream s = EventStream.openRepository()) {
- s.onEvent("Chunk", e -> {
- if (e.getBoolean("end")) {
- endEventRecevied.countDown();
- return;
- }
- System.out.println("Stream should start at latest event:");
- System.out.println(e);
- });
+
+ // Create an event in a segment, typically the first.
+ NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.commit();
- ChunkEvent e1 = new ChunkEvent();
- e1.end = false;
- e1.commit();
+ try (EventStream s = EventStream.openRepository()) {
+ awaitFlush(r); // ensure that NotLatest is included
s.startAsync();
- s.onFlush(() -> {
- emitEvent.countDown();
+ AtomicBoolean foundLatest = new AtomicBoolean();
+ // Emit the latest event
+ LatestEvent latest = new LatestEvent();
+ latest.commit();
+ s.onEvent(event -> {
+ String name = event.getEventType().getName();
+ System.out.println("Found event " + name);
+ foundLatest.set(name.equals("Latest"));
+ s.close();
});
- emitEvent.await();
- ChunkEvent e2 = new ChunkEvent();
- e2.end = true;
- e2.commit();
-
- endEventRecevied.await();
+ s.awaitTermination();
+ if (!foundLatest.get()) {
+ throw new Exception("Didn't find latest event!");
+ }
}
}
}
+
+ private static void awaitFlush(RecordingStream r) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ r.onFlush(() -> {
+ latch.countDown();
+ });
+ latch.await();
+ }
}