test/jdk/jdk/jfr/api/consumer/streaming/TestCrossProcessStreaming.java
changeset 59231 2b9027360909
equal deleted inserted replaced
59230:a2a921609481 59231:2b9027360909
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.api.consumer.streaming;
       
    27 
       
    28 import java.nio.file.Files;
       
    29 import java.nio.file.Path;
       
    30 import java.nio.file.Paths;
       
    31 import java.time.Duration;
       
    32 import java.time.Instant;
       
    33 import java.util.concurrent.CountDownLatch;
       
    34 import java.util.concurrent.TimeUnit;
       
    35 import java.util.concurrent.atomic.AtomicInteger;
       
    36 
       
    37 import com.sun.tools.attach.VirtualMachine;
       
    38 import jdk.jfr.Event;
       
    39 import jdk.jfr.Recording;
       
    40 import jdk.jfr.consumer.EventStream;
       
    41 import jdk.test.lib.Asserts;
       
    42 import jdk.test.lib.process.ProcessTools;
       
    43 
       
    44 /**
       
    45  * @test
       
    46  * @summary Test scenario where JFR event producer is in a different process
       
    47  *          with respect to the JFR event stream consumer.
       
    48  * @key jfr
       
    49  * @requires vm.hasJFR
       
    50  * @library /test/lib /test/jdk
       
    51  * @modules jdk.attach
       
    52  *          jdk.jfr
       
    53  * @run main jdk.jfr.api.consumer.streaming.TestCrossProcessStreaming
       
    54  */
       
    55 
       
    56 public class TestCrossProcessStreaming {
       
    57     static String MAIN_STARTED_TOKEN = "MAIN_STARTED";
       
    58 
       
    59     public static class TestEvent extends Event {
       
    60     }
       
    61 
       
    62     public static class ResultEvent extends Event {
       
    63         int nrOfEventsProduced;
       
    64     }
       
    65 
       
    66     public static class EventProducer {
       
    67         public static void main(String... args) throws Exception {
       
    68             CrossProcessSynchronizer sync = new CrossProcessSynchronizer();
       
    69             log(MAIN_STARTED_TOKEN);
       
    70 
       
    71             long pid = ProcessHandle.current().pid();
       
    72             int nrOfEvents = 0;
       
    73             boolean exitRequested = false;
       
    74             while (!exitRequested) {
       
    75                 TestEvent e = new TestEvent();
       
    76                 e.commit();
       
    77                 nrOfEvents++;
       
    78                 if (nrOfEvents % 1000 == 0) {
       
    79                     Thread.sleep(100);
       
    80                     exitRequested = CrossProcessSynchronizer.exitRequested(pid);
       
    81                 }
       
    82             }
       
    83 
       
    84             ResultEvent re = new ResultEvent();
       
    85             re.nrOfEventsProduced = nrOfEvents;
       
    86             re.commit();
       
    87 
       
    88             log("Number of TestEvents generated: " + nrOfEvents);
       
    89         }
       
    90     }
       
    91 
       
    92 
       
    93     static class CrossProcessSynchronizer {
       
    94         static void requestExit(long pid) throws Exception {
       
    95             Files.createFile(file(pid));
       
    96        }
       
    97 
       
    98         static boolean exitRequested(long pid) throws Exception {
       
    99             return Files.exists(file(pid));
       
   100         }
       
   101 
       
   102         static Path file(long pid) {
       
   103             return Paths.get(".", "exit-requested-" + pid);
       
   104         }
       
   105     }
       
   106 
       
   107 
       
   108     static class ConsumedEvents {
       
   109         AtomicInteger total = new AtomicInteger(0);
       
   110         AtomicInteger whileProducerAlive = new AtomicInteger(0);
       
   111         AtomicInteger produced = new AtomicInteger(-1);
       
   112     }
       
   113 
       
   114 
       
   115     public static void main(String... args) throws Exception {
       
   116         Process p = startProducerProcess("normal");
       
   117         String repo = getJfrRepository(p);
       
   118 
       
   119         ConsumedEvents ce = consumeEvents(p, repo);
       
   120 
       
   121         p.waitFor();
       
   122         Asserts.assertEquals(p.exitValue(), 0,
       
   123                              "Process exited abnormally, exitValue = " + p.exitValue());
       
   124 
       
   125         Asserts.assertEquals(ce.total.get(), ce.produced.get(), "Some events were lost");
       
   126 
       
   127         // Expected that some portion of events emitted by the producer are delivered
       
   128         // to the consumer while producer is still alive, at least one event for certain.
       
   129         Asserts.assertLTE(1, ce.whileProducerAlive.get(),
       
   130                            "Too few events are delivered while producer is alive");
       
   131     }
       
   132 
       
   133     static Process startProducerProcess(String extraParam) throws Exception {
       
   134         ProcessBuilder pb =
       
   135             ProcessTools.createJavaProcessBuilder(false,
       
   136                                                   "-XX:StartFlightRecording",
       
   137                                                   EventProducer.class.getName(),
       
   138                                                   extraParam);
       
   139         Process p = ProcessTools.startProcess("Event-Producer", pb,
       
   140                                               line -> line.equals(MAIN_STARTED_TOKEN),
       
   141                                               0, TimeUnit.SECONDS);
       
   142         return p;
       
   143     }
       
   144 
       
   145     static String getJfrRepository(Process p) throws Exception {
       
   146         String repo = null;
       
   147 
       
   148         // It may take little bit of time for the observed process to set the property after
       
   149         // the process starts, therefore read the property in a loop.
       
   150         while (repo == null) {
       
   151             VirtualMachine vm = VirtualMachine.attach(String.valueOf(p.pid()));
       
   152             repo = vm.getSystemProperties().getProperty("jdk.jfr.repository");
       
   153             vm.detach();
       
   154         }
       
   155 
       
   156         log("JFR repository = " + repo);
       
   157         return repo;
       
   158     }
       
   159 
       
   160     static ConsumedEvents consumeEvents(Process p, String repo) throws Exception {
       
   161         ConsumedEvents result = new ConsumedEvents();
       
   162 
       
   163         // wait for couple of JFR stream flushes before concluding the test
       
   164         CountDownLatch flushed = new CountDownLatch(2);
       
   165 
       
   166         // consume events produced by another process via event stream
       
   167         try (EventStream es = EventStream.openRepository(Paths.get(repo))) {
       
   168                 es.onEvent(TestEvent.class.getName(),
       
   169                            e -> {
       
   170                                result.total.incrementAndGet();
       
   171                                if (p.isAlive()) {
       
   172                                    result.whileProducerAlive.incrementAndGet();
       
   173                                }
       
   174                            });
       
   175 
       
   176                 es.onEvent(ResultEvent.class.getName(),
       
   177                            e -> result.produced.set(e.getInt("nrOfEventsProduced")));
       
   178 
       
   179                 es.onFlush( () -> flushed.countDown() );
       
   180 
       
   181                 // Setting start time to the beginning of the Epoch is a good way to start
       
   182                 // reading the stream from the very beginning.
       
   183                 es.setStartTime(Instant.EPOCH);
       
   184                 es.startAsync();
       
   185 
       
   186                 // await for certain number of flush events before concluding the test case
       
   187                 flushed.await();
       
   188                 CrossProcessSynchronizer.requestExit(p.pid());
       
   189 
       
   190                 es.awaitTermination();
       
   191             }
       
   192 
       
   193         return result;
       
   194     }
       
   195 
       
   196     private static final void log(String msg) {
       
   197         System.out.println(msg);
       
   198     }
       
   199 }