56 testCloseStreaming(); |
54 testCloseStreaming(); |
57 testCloseMySelf(); |
55 testCloseMySelf(); |
58 testCloseNoEvents(); |
56 testCloseNoEvents(); |
59 } |
57 } |
60 |
58 |
61 private static void testCloseMySelf() throws Exception { |
59 private static void testCloseUnstarted() { |
62 log("Entering testCloseMySelf()"); |
60 System.out.println("testCloseUnstarted()"); |
63 CountDownLatch l1 = new CountDownLatch(1); |
61 |
64 CountDownLatch l2 = new CountDownLatch(1); |
62 try (RecordingStream r = new RecordingStream()) { |
65 RecordingStream r = new RecordingStream(); |
63 r.close(); |
66 r.onEvent(e -> { |
64 } |
67 try { |
65 } |
68 l1.await(); |
66 |
69 r.close(); |
67 private static void testCloseStarted() { |
70 l2.countDown(); |
68 System.out.println("testCloseStarted()"); |
71 } catch (InterruptedException ie) { |
69 |
72 throw new Error(ie); |
70 try (RecordingStream r = new RecordingStream()) { |
73 } |
71 r.startAsync(); |
74 }); |
72 } // <- Close |
75 r.startAsync(); |
73 } |
76 CloseEvent c = new CloseEvent(); |
74 |
77 c.commit(); |
75 private static void testCloseTwice() { |
78 l1.countDown(); |
76 System.out.println("Entering testCloseTwice()"); |
79 l2.await(); |
77 |
80 log("Leaving testCloseMySelf()"); |
78 try (RecordingStream r = new RecordingStream()) { |
|
79 r.startAsync(); |
|
80 r.close(); |
|
81 } // <- Second close |
81 } |
82 } |
82 |
83 |
83 private static void testCloseStreaming() throws Exception { |
84 private static void testCloseStreaming() throws Exception { |
84 log("Entering testCloseStreaming()"); |
85 System.out.println("Entering testCloseStreaming()"); |
|
86 |
|
87 EventProducer p = new EventProducer(); |
|
88 p.start(); |
85 CountDownLatch streaming = new CountDownLatch(1); |
89 CountDownLatch streaming = new CountDownLatch(1); |
86 RecordingStream r = new RecordingStream(); |
90 try (RecordingStream r = new RecordingStream()) { |
87 AtomicLong count = new AtomicLong(); |
91 r.onEvent(e -> { |
88 r.onEvent(e -> { |
|
89 if (count.incrementAndGet() > 100) { |
|
90 streaming.countDown(); |
92 streaming.countDown(); |
91 } |
93 }); |
92 }); |
94 r.startAsync(); |
93 r.startAsync(); |
95 streaming.await(); |
94 var streamingLoop = CompletableFuture.runAsync(() -> { |
96 } // <- Close |
95 while (true) { |
97 p.kill(); |
96 CloseEvent c = new CloseEvent(); |
|
97 c.commit(); |
|
98 } |
|
99 }); |
|
100 streaming.await(); |
|
101 r.close(); |
|
102 streamingLoop.cancel(true); |
|
103 log("Leaving testCloseStreaming()"); |
|
104 } |
98 } |
105 |
99 |
106 private static void testCloseStarted() { |
100 private static void testCloseMySelf() throws Exception { |
107 log("Entering testCloseStarted()"); |
101 System.out.println("testCloseMySelf()"); |
108 RecordingStream r = new RecordingStream(); |
|
109 r.startAsync(); |
|
110 r.close(); |
|
111 log("Leaving testCloseStarted()"); |
|
112 } |
|
113 |
102 |
114 private static void testCloseUnstarted() { |
103 CountDownLatch closed = new CountDownLatch(1); |
115 log("Entering testCloseUnstarted()"); |
104 try (RecordingStream r = new RecordingStream()) { |
116 RecordingStream r = new RecordingStream(); |
105 r.onEvent(e -> { |
117 r.close(); |
106 r.close(); // <- Close |
118 log("Leaving testCloseUnstarted()"); |
107 closed.countDown(); |
119 } |
108 }); |
120 |
109 r.startAsync(); |
121 private static void testCloseTwice() { |
110 CloseEvent c = new CloseEvent(); |
122 log("Entering testCloseTwice()"); |
111 c.commit(); |
123 RecordingStream r = new RecordingStream(); |
112 closed.await(); |
124 r.startAsync(); |
113 } |
125 r.close(); |
|
126 r.close(); |
|
127 log("Leaving testCloseTwice()"); |
|
128 } |
114 } |
129 |
115 |
130 private static void testCloseNoEvents() throws Exception { |
116 private static void testCloseNoEvents() throws Exception { |
|
117 System.out.println("testCloseNoEvents()"); |
|
118 |
131 try (Recording r = new Recording()) { |
119 try (Recording r = new Recording()) { |
132 r.start(); |
120 r.start(); |
133 CountDownLatch finished = new CountDownLatch(2); |
121 CountDownLatch finished = new CountDownLatch(2); |
134 AtomicReference<Thread> streamingThread = new AtomicReference<>(); |
122 AtomicReference<Thread> streamingThread = new AtomicReference<>(); |
135 try (EventStream es = EventStream.openRepository()) { |
123 try (EventStream es = EventStream.openRepository()) { |
136 es.setStartTime(Instant.EPOCH); |
124 es.setStartTime(Instant.EPOCH); |
137 es.onFlush( () -> { |
125 es.onFlush(() -> { |
138 streamingThread.set(Thread.currentThread()); |
126 streamingThread.set(Thread.currentThread()); |
139 finished.countDown();; |
127 finished.countDown(); |
140 }); |
128 }); |
141 es.startAsync(); |
129 es.startAsync(); |
142 finished.await(); |
130 finished.await(); |
143 } // <- EventStream::close should terminate thread |
131 } // <- Close should terminate thread |
144 while (streamingThread.get().isAlive()) { |
132 while (streamingThread.get().isAlive()) { |
145 Thread.sleep(10); |
133 Thread.sleep(10); |
146 } |
134 } |
147 } |
135 } |
148 } |
136 } |
149 |
|
150 private static void log(String msg) { |
|
151 System.out.println(msg); |
|
152 } |
|
153 } |
137 } |