53 |
51 |
54 // Will generate about 100 MB of data, or 8-9 chunks |
52 // Will generate about 100 MB of data, or 8-9 chunks |
55 private static final int EVENT_COUNT = 5_000_000; |
53 private static final int EVENT_COUNT = 5_000_000; |
56 |
54 |
57 public static void main(String... args) throws Exception { |
55 public static void main(String... args) throws Exception { |
58 CountDownLatch end = new CountDownLatch(1); |
56 try (RecordingStream rs = new RecordingStream()) { |
59 AtomicInteger idCounter = new AtomicInteger(); |
57 rs.onEvent(FillEvent.class.getName(), e -> { |
60 try (RecordingStream es = new RecordingStream()) { |
58 int id = e.getInt("id"); |
61 es.onEvent(EndEvent.class.getName(), e -> end.countDown()); |
59 // Some events may get lost due to |
62 es.onEvent(FillEvent.class.getName(), e -> { |
60 // buffer being full. |
63 idCounter.incrementAndGet(); |
61 if (id > EVENT_COUNT / 2) { |
64 // if (id != expected) { |
62 rs.close(); |
65 // throw new Error("Expected id " + expected + ", but got " + id); |
63 } |
66 // } |
|
67 }); |
64 }); |
68 es.startAsync(); |
65 rs.startAsync(); |
69 long seed = System.currentTimeMillis(); |
66 long seed = System.currentTimeMillis(); |
70 System.out.println("Random seed: " + seed); |
67 System.out.println("Random seed: " + seed); |
71 Random r = new Random(seed); |
68 Random r = new Random(seed); |
72 for (int i = 1; i < EVENT_COUNT; i++) { |
69 for (int i = 1; i < EVENT_COUNT; i++) { |
73 FillEvent f = new FillEvent(); |
70 FillEvent f = new FillEvent(); |
74 f.message = i %2 == 0 ? "ko" : "kak"; |
71 f.message = i % 2 == 0 ? "hello, hello, hello, hello, hello!" : "hi!"; |
75 f.value = r.nextInt(10000); |
72 f.value = r.nextInt(10000); |
76 f.id = i; |
73 f.id = i; |
77 f.commit(); |
74 f.commit(); |
78 if (i % 1_000_000 == 0) { |
75 if (i % 1_000_000 == 0) { |
79 System.out.println("Emitted " + i + " events"); |
76 System.out.println("Emitted " + i + " events"); |
80 } |
77 } |
81 } |
78 } |
82 System.out.println("Awaiting end event"); |
79 rs.awaitTermination(); |
83 Thread.sleep(1_000); |
|
84 for (int i = 1; i < EVENT_COUNT; i++) { |
|
85 EndEvent e = new EndEvent(); |
|
86 e.commit(); |
|
87 } |
|
88 end.await(); |
|
89 |
|
90 } |
80 } |
91 } |
81 } |
92 } |
82 } |