test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java
branchJEP-349-branch
changeset 58774 141412e96b12
parent 58724 3d0a172353fc
child 58806 a7d850b47b19
equal deleted inserted replaced
58771:e6feb2874fa6 58774:141412e96b12
    80             System.out.println("All empty chunks created");
    80             System.out.println("All empty chunks created");
    81             // Create an event in a segment, typically the first.
    81             // Create an event in a segment, typically the first.
    82             NotLatestEvent notLatest = new NotLatestEvent();
    82             NotLatestEvent notLatest = new NotLatestEvent();
    83             notLatest.commit();
    83             notLatest.commit();
    84             try (EventStream s = EventStream.openRepository()) {
    84             try (EventStream s = EventStream.openRepository()) {
       
    85                 // Wait for next segment
       
    86                 // to prevent flush to prevent NotLatest to be included
       
    87                 awaitFlush(r);
    85                 System.out.println("EventStream opened");
    88                 System.out.println("EventStream opened");
    86                 awaitFlush(r); // ensure that NotLatest is included
       
    87                 s.startAsync();
       
    88                 AtomicBoolean foundLatest = new AtomicBoolean();
    89                 AtomicBoolean foundLatest = new AtomicBoolean();
    89                 System.out.println("Added onEvent handler");
       
    90                 s.onEvent(event -> {
    90                 s.onEvent(event -> {
    91                     String name = event.getEventType().getName();
    91                     String name = event.getEventType().getName();
    92                     System.out.println("Found event " + name);
    92                     System.out.println("Found event " + name);
    93                     foundLatest.set(name.equals("Latest"));
    93                     foundLatest.set(name.equals("Latest"));
    94                     s.close();
    94                     s.close();
    95                 });
    95                 });
       
    96                 System.out.println("Added onEvent handler");
       
    97                 s.startAsync();
       
    98                 // wait for next segment
       
    99                 awaitFlush(s);
    96                 // Emit the latest event
   100                 // Emit the latest event
    97                 LatestEvent latest = new LatestEvent();
   101                 LatestEvent latest = new LatestEvent();
    98                 latest.commit();
   102                 latest.commit();
    99                 System.out.println("Latest event emitted");
   103                 System.out.println("Latest event emitted");
   100                 System.out.println("Waiting for termination");
   104                 System.out.println("Waiting for termination");
   104                 }
   108                 }
   105             }
   109             }
   106         }
   110         }
   107     }
   111     }
   108 
   112 
   109     private static void awaitFlush(RecordingStream r) throws InterruptedException {
   113     private static void awaitFlush(EventStream stream) throws InterruptedException {
   110         CountDownLatch latch = new CountDownLatch(1);
   114         CountDownLatch latch = new CountDownLatch(1);
   111         System.out.println("Waiting for flush...");
   115         System.out.println("Waiting for flush...");
   112         r.onFlush(() -> {
   116         final Runnable l =  () -> {
   113             System.out.println("Flush arrived!");
   117             System.out.println("Flush arrived!");
   114             latch.countDown();
   118             latch.countDown();
   115         });
   119         };
       
   120         stream.onFlush(l);
   116         latch.await();
   121         latch.await();
   117 
   122         stream.remove(l);
   118     }
   123     }
   119 }
   124 }