test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java
changeset 59327 2c3578aa0bdf
parent 59310 72f3dd43dd28
equal deleted inserted replaced
59326:851a389fc54d 59327:2c3578aa0bdf
    24  */
    24  */
    25 
    25 
    26 package jdk.jfr.api.consumer.recordingstream;
    26 package jdk.jfr.api.consumer.recordingstream;
    27 
    27 
    28 import java.time.Instant;
    28 import java.time.Instant;
    29 import java.util.concurrent.CompletableFuture;
       
    30 import java.util.concurrent.CountDownLatch;
    29 import java.util.concurrent.CountDownLatch;
    31 import java.util.concurrent.atomic.AtomicLong;
       
    32 import java.util.concurrent.atomic.AtomicReference;
    30 import java.util.concurrent.atomic.AtomicReference;
    33 
    31 
    34 import jdk.jfr.Event;
    32 import jdk.jfr.Event;
    35 import jdk.jfr.Recording;
    33 import jdk.jfr.Recording;
    36 import jdk.jfr.consumer.EventStream;
    34 import jdk.jfr.consumer.EventStream;
    39 /**
    37 /**
    40  * @test
    38  * @test
    41  * @summary Tests RecordingStream::close()
    39  * @summary Tests RecordingStream::close()
    42  * @key jfr
    40  * @key jfr
    43  * @requires vm.hasJFR
    41  * @requires vm.hasJFR
    44  * @library /test/lib
    42  * @library /test/lib /test/jdk
    45  * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestClose
    43  * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestClose
    46  */
    44  */
    47 public class TestClose {
    45 public class TestClose {
    48 
    46 
    49     private static class CloseEvent extends Event {
    47     private static class CloseEvent extends Event {
    56         testCloseStreaming();
    54         testCloseStreaming();
    57         testCloseMySelf();
    55         testCloseMySelf();
    58         testCloseNoEvents();
    56         testCloseNoEvents();
    59     }
    57     }
    60 
    58 
    61     private static void testCloseMySelf() throws Exception {
    59     private static void testCloseUnstarted() {
    62         log("Entering testCloseMySelf()");
    60         System.out.println("testCloseUnstarted()");
    63         CountDownLatch l1 = new CountDownLatch(1);
    61 
    64         CountDownLatch l2 = new CountDownLatch(1);
    62         try (RecordingStream r = new RecordingStream()) {
    65         RecordingStream r = new RecordingStream();
    63             r.close();
    66         r.onEvent(e -> {
    64         }
    67             try {
    65     }
    68                 l1.await();
    66 
    69                 r.close();
    67     private static void testCloseStarted() {
    70                 l2.countDown();
    68         System.out.println("testCloseStarted()");
    71             } catch (InterruptedException ie) {
    69 
    72                 throw new Error(ie);
    70         try (RecordingStream r = new RecordingStream()) {
    73             }
    71             r.startAsync();
    74         });
    72         } // <- Close
    75         r.startAsync();
    73     }
    76         CloseEvent c = new CloseEvent();
    74 
    77         c.commit();
    75     private static void testCloseTwice() {
    78         l1.countDown();
    76         System.out.println("Entering testCloseTwice()");
    79         l2.await();
    77 
    80         log("Leaving testCloseMySelf()");
    78         try (RecordingStream r = new RecordingStream()) {
       
    79             r.startAsync();
       
    80             r.close();
       
    81         } // <- Second close
    81     }
    82     }
    82 
    83 
    83     private static void testCloseStreaming() throws Exception {
    84     private static void testCloseStreaming() throws Exception {
    84         log("Entering testCloseStreaming()");
    85         System.out.println("Entering testCloseStreaming()");
       
    86 
       
    87         EventProducer p = new EventProducer();
       
    88         p.start();
    85         CountDownLatch streaming = new CountDownLatch(1);
    89         CountDownLatch streaming = new CountDownLatch(1);
    86         RecordingStream r = new RecordingStream();
    90         try (RecordingStream r = new RecordingStream()) {
    87         AtomicLong count = new AtomicLong();
    91             r.onEvent(e -> {
    88         r.onEvent(e -> {
       
    89             if (count.incrementAndGet() > 100) {
       
    90                 streaming.countDown();
    92                 streaming.countDown();
    91             }
    93             });
    92         });
    94             r.startAsync();
    93         r.startAsync();
    95             streaming.await();
    94         var streamingLoop = CompletableFuture.runAsync(() -> {
    96         } // <- Close
    95             while (true) {
    97         p.kill();
    96                 CloseEvent c = new CloseEvent();
       
    97                 c.commit();
       
    98             }
       
    99         });
       
   100         streaming.await();
       
   101         r.close();
       
   102         streamingLoop.cancel(true);
       
   103         log("Leaving testCloseStreaming()");
       
   104     }
    98     }
   105 
    99 
   106     private static void testCloseStarted() {
   100     private static void testCloseMySelf() throws Exception {
   107         log("Entering testCloseStarted()");
   101         System.out.println("testCloseMySelf()");
   108         RecordingStream r = new RecordingStream();
       
   109         r.startAsync();
       
   110         r.close();
       
   111         log("Leaving testCloseStarted()");
       
   112     }
       
   113 
   102 
   114     private static void testCloseUnstarted() {
   103         CountDownLatch closed = new CountDownLatch(1);
   115         log("Entering testCloseUnstarted()");
   104         try (RecordingStream r = new RecordingStream()) {
   116         RecordingStream r = new RecordingStream();
   105             r.onEvent(e -> {
   117         r.close();
   106                 r.close();  // <- Close
   118         log("Leaving testCloseUnstarted()");
   107                 closed.countDown();
   119     }
   108             });
   120 
   109             r.startAsync();
   121     private static void testCloseTwice() {
   110             CloseEvent c = new CloseEvent();
   122         log("Entering testCloseTwice()");
   111             c.commit();
   123         RecordingStream r = new RecordingStream();
   112             closed.await();
   124         r.startAsync();
   113         }
   125         r.close();
       
   126         r.close();
       
   127         log("Leaving testCloseTwice()");
       
   128     }
   114     }
   129 
   115 
   130     private static void testCloseNoEvents() throws Exception {
   116     private static void testCloseNoEvents() throws Exception {
       
   117         System.out.println("testCloseNoEvents()");
       
   118 
   131         try (Recording r = new Recording()) {
   119         try (Recording r = new Recording()) {
   132             r.start();
   120             r.start();
   133             CountDownLatch finished = new CountDownLatch(2);
   121             CountDownLatch finished = new CountDownLatch(2);
   134             AtomicReference<Thread> streamingThread = new AtomicReference<>();
   122             AtomicReference<Thread> streamingThread = new AtomicReference<>();
   135             try (EventStream es = EventStream.openRepository()) {
   123             try (EventStream es = EventStream.openRepository()) {
   136                 es.setStartTime(Instant.EPOCH);
   124                 es.setStartTime(Instant.EPOCH);
   137                 es.onFlush( () -> {
   125                 es.onFlush(() -> {
   138                     streamingThread.set(Thread.currentThread());
   126                     streamingThread.set(Thread.currentThread());
   139                     finished.countDown();;
   127                     finished.countDown();
   140                 });
   128                 });
   141                 es.startAsync();
   129                 es.startAsync();
   142                 finished.await();
   130                 finished.await();
   143             } // <- EventStream::close should terminate thread
   131             } // <- Close should terminate thread
   144             while (streamingThread.get().isAlive()) {
   132             while (streamingThread.get().isAlive()) {
   145                 Thread.sleep(10);
   133                 Thread.sleep(10);
   146             }
   134             }
   147         }
   135         }
   148     }
   136     }
   149 
       
   150     private static void log(String msg) {
       
   151         System.out.println(msg);
       
   152     }
       
   153 }
   137 }