63 e.commit(); |
63 e.commit(); |
64 CountDownLatch beginChunks = new CountDownLatch(1); |
64 CountDownLatch beginChunks = new CountDownLatch(1); |
65 r.onEvent("MakeChunks", event-> { |
65 r.onEvent("MakeChunks", event-> { |
66 beginChunks.countDown(); |
66 beginChunks.countDown(); |
67 }); |
67 }); |
|
68 System.out.println("Waitning for first chunk"); |
68 beginChunks.await(); |
69 beginChunks.await(); |
69 // Create 5 chunks with events in the repository |
70 // Create 5 chunks with events in the repository |
70 for (int i = 0; i < 5; i++) { |
71 for (int i = 0; i < 5; i++) { |
|
72 System.out.println("Creating empty chunk"); |
71 try (Recording r1 = new Recording()) { |
73 try (Recording r1 = new Recording()) { |
72 r1.start(); |
74 r1.start(); |
73 NotLatestEvent notLatest = new NotLatestEvent(); |
75 NotLatestEvent notLatest = new NotLatestEvent(); |
74 notLatest.commit(); |
76 notLatest.commit(); |
75 r1.stop(); |
77 r1.stop(); |
76 } |
78 } |
77 } |
79 } |
78 |
80 System.out.println("All empty chunks created"); |
79 // Create an event in a segment, typically the first. |
81 // Create an event in a segment, typically the first. |
80 NotLatestEvent notLatest = new NotLatestEvent(); |
82 NotLatestEvent notLatest = new NotLatestEvent(); |
81 notLatest.commit(); |
83 notLatest.commit(); |
82 |
|
83 try (EventStream s = EventStream.openRepository()) { |
84 try (EventStream s = EventStream.openRepository()) { |
|
85 System.out.println("EventStream opened"); |
84 awaitFlush(r); // ensure that NotLatest is included |
86 awaitFlush(r); // ensure that NotLatest is included |
85 s.startAsync(); |
87 s.startAsync(); |
86 AtomicBoolean foundLatest = new AtomicBoolean(); |
88 AtomicBoolean foundLatest = new AtomicBoolean(); |
87 // Emit the latest event |
89 System.out.println("Added onEvent handler"); |
88 LatestEvent latest = new LatestEvent(); |
|
89 latest.commit(); |
|
90 s.onEvent(event -> { |
90 s.onEvent(event -> { |
91 String name = event.getEventType().getName(); |
91 String name = event.getEventType().getName(); |
92 System.out.println("Found event " + name); |
92 System.out.println("Found event " + name); |
93 foundLatest.set(name.equals("Latest")); |
93 foundLatest.set(name.equals("Latest")); |
94 s.close(); |
94 s.close(); |
95 }); |
95 }); |
|
96 // Emit the latest event |
|
97 LatestEvent latest = new LatestEvent(); |
|
98 latest.commit(); |
|
99 System.out.println("Latest event emitted"); |
|
100 System.out.println("Waiting for termination"); |
96 s.awaitTermination(); |
101 s.awaitTermination(); |
97 if (!foundLatest.get()) { |
102 if (!foundLatest.get()) { |
98 throw new Exception("Didn't find latest event!"); |
103 throw new Exception("Didn't find latest event!"); |
99 } |
104 } |
100 } |
105 } |
101 } |
106 } |
102 } |
107 } |
103 |
108 |
104 private static void awaitFlush(RecordingStream r) throws InterruptedException { |
109 private static void awaitFlush(RecordingStream r) throws InterruptedException { |
105 CountDownLatch latch = new CountDownLatch(1); |
110 CountDownLatch latch = new CountDownLatch(1); |
|
111 System.out.println("Waiting for flush..."); |
106 r.onFlush(() -> { |
112 r.onFlush(() -> { |
|
113 System.out.println("Flush arrived!"); |
107 latch.countDown(); |
114 latch.countDown(); |
108 }); |
115 }); |
109 latch.await(); |
116 latch.await(); |
|
117 |
110 } |
118 } |
111 } |
119 } |