28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.file.Path; |
29 import java.nio.file.Path; |
30 import java.nio.file.Paths; |
30 import java.nio.file.Paths; |
31 import java.time.Duration; |
31 import java.time.Duration; |
32 import java.time.Instant; |
32 import java.time.Instant; |
|
33 import java.util.concurrent.CountDownLatch; |
33 import java.util.concurrent.atomic.AtomicBoolean; |
34 import java.util.concurrent.atomic.AtomicBoolean; |
|
35 import java.util.concurrent.atomic.AtomicInteger; |
34 |
36 |
35 import jdk.jfr.Event; |
37 import jdk.jfr.Event; |
36 import jdk.jfr.Name; |
38 import jdk.jfr.Name; |
37 import jdk.jfr.Recording; |
39 import jdk.jfr.Recording; |
38 import jdk.jfr.StackTrace; |
40 import jdk.jfr.StackTrace; |
39 import jdk.jfr.Timestamp; |
|
40 import jdk.jfr.consumer.EventStream; |
41 import jdk.jfr.consumer.EventStream; |
41 import jdk.jfr.consumer.RecordedEvent; |
42 import jdk.jfr.consumer.RecordedEvent; |
42 import jdk.jfr.consumer.RecordingFile; |
43 import jdk.jfr.consumer.RecordingFile; |
43 import jdk.jfr.consumer.RecordingStream; |
44 import jdk.jfr.consumer.RecordingStream; |
44 |
45 |
57 public final static class Mark extends Event { |
58 public final static class Mark extends Event { |
58 public boolean before; |
59 public boolean before; |
59 } |
60 } |
60 |
61 |
61 public static void main(String... args) throws Exception { |
62 public static void main(String... args) throws Exception { |
62 testEventStream(); |
63 // testEventStream(); |
63 testRecordingStream(); |
64 testRecordingStream(); |
64 } |
65 } |
65 |
66 |
66 @StackTrace(false) |
|
67 public final static class Now extends Event { |
|
68 @Timestamp(Timestamp.MILLISECONDS_SINCE_EPOCH) |
|
69 public long timestamp; |
|
70 } |
|
71 |
|
72 private static void testRecordingStream() throws Exception { |
67 private static void testRecordingStream() throws Exception { |
73 try (RecordingStream rs = new RecordingStream()) { |
68 while (true) { |
74 AtomicBoolean fail = new AtomicBoolean(); |
69 CountDownLatch closed = new CountDownLatch(1); |
75 Instant endTime = Instant.now().plus(Duration.ofSeconds(1)); |
70 AtomicInteger count = new AtomicInteger(); |
76 rs.setReuse(false); |
71 try (RecordingStream rs = new RecordingStream()) { |
77 rs.onEvent(e -> { |
72 rs.setFlushInterval(Duration.ofSeconds(1)); |
78 if (e.getEndTime().isAfter(endTime)) { |
73 rs.onEvent(e -> { |
79 fail.set(true); |
74 count.incrementAndGet(); |
|
75 }); |
|
76 // when end is reached stream is closed |
|
77 rs.onClose(() -> { |
|
78 closed.countDown(); |
|
79 }); |
|
80 Instant endTime = Instant.now().plus(Duration.ofMillis(10_000)); |
|
81 System.out.println("Setting end time: " + endTime); |
|
82 rs.setEndTime(endTime); |
|
83 rs.startAsync(); |
|
84 for (int i = 0; i < 50; i++) { |
|
85 Mark m = new Mark(); |
|
86 m.commit(); |
|
87 Thread.sleep(10); |
80 } |
88 } |
81 }); |
89 closed.await(); |
82 rs.setEndTime(endTime); |
90 System.out.println("Found events: " + count.get()); |
83 rs.startAsync(); |
91 if (count.get() < 50) { |
84 for (int i = 0; i < 100; i++) { |
92 return; |
85 Now m = new Now(); |
93 } |
86 m.commit(); |
94 System.out.println("Found 50 events. Retrying"); |
87 Thread.sleep(10); |
95 System.out.println(); |
88 } |
96 } |
89 } |
97 } |
90 } |
98 } |
91 |
99 |
92 private static void testEventStream() throws InterruptedException, IOException, Exception { |
100 static void testEventStream() throws InterruptedException, IOException, Exception { |
93 try (Recording r = new Recording()) { |
101 try (Recording r = new Recording()) { |
94 r.setFlushInterval(Duration.ofSeconds(1)); |
102 r.setFlushInterval(Duration.ofSeconds(1)); |
95 r.start(); |
103 r.start(); |
96 Mark event1 = new Mark(); |
104 Mark event1 = new Mark(); |
97 event1.begin(); // start time |
105 event1.begin(); // start time |