8234671: JFR api/consumer/recordingstream/TestStart.java failed due to timeout at testStartTwice()
authoregahlin
Fri, 29 Nov 2019 17:31:01 +0100
changeset 59327 2c3578aa0bdf
parent 59326 851a389fc54d
child 59328 f280911d3427
8234671: JFR api/consumer/recordingstream/TestStart.java failed due to timeout at testStartTwice() Reviewed-by: mgronlun
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java	Fri Nov 29 17:31:01 2019 +0100
@@ -76,9 +76,10 @@
     abstract public void close();
 
     protected final Dispatcher dispatcher() {
-        if (configuration.hasChanged()) {
+        if (configuration.hasChanged()) { // quick check
             synchronized (configuration) {
                 dispatcher = new Dispatcher(configuration);
+                configuration.setChanged(false);
             }
         }
         return dispatcher;
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Fri Nov 29 17:31:01 2019 +0100
@@ -190,44 +190,40 @@
      *
      * @param awaitNewEvents wait for new data.
      */
-    RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException {
+    RecordedEvent readStreamingEvent() throws IOException {
         long absoluteChunkEnd = chunkHeader.getEnd();
-        while (true) {
-            RecordedEvent event = readEvent();
-            if (event != null) {
-                return event;
-            }
-            if (!awaitNewEvents) {
-                return null;
-            }
-            long lastValid = absoluteChunkEnd;
-            long metadataPoistion = chunkHeader.getMetataPosition();
-            long contantPosition = chunkHeader.getConstantPoolPosition();
-            chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd);
-            if (chunkFinished) {
-                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
-                return null;
-            }
-            absoluteChunkEnd = chunkHeader.getEnd();
-            // Read metadata and constant pools for the next segment
-            if (chunkHeader.getMetataPosition() != metadataPoistion) {
-                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
-                MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
-                ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
-                parsers = factory.getParsers();
-                typeMap = factory.getTypeMap();
-                updateConfiguration();;
-            }
-            if (contantPosition != chunkHeader.getConstantPoolPosition()) {
-                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
-                constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
-                fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
-                constantLookups.forEach(c -> c.getLatestPool().setResolving());
-                constantLookups.forEach(c -> c.getLatestPool().resolve());
-                constantLookups.forEach(c -> c.getLatestPool().setResolved());
-            }
-            input.position(lastValid);
+        RecordedEvent event = readEvent();
+        if (event != null) {
+            return event;
+        }
+        long lastValid = absoluteChunkEnd;
+        long metadataPosition = chunkHeader.getMetataPosition();
+        long contantPosition = chunkHeader.getConstantPoolPosition();
+        chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd);
+        if (chunkFinished) {
+            Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
+            return null;
         }
+        absoluteChunkEnd = chunkHeader.getEnd();
+        // Read metadata and constant pools for the next segment
+        if (chunkHeader.getMetataPosition() != metadataPosition) {
+            Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
+            MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
+            ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
+            parsers = factory.getParsers();
+            typeMap = factory.getTypeMap();
+            updateConfiguration();
+        }
+        if (contantPosition != chunkHeader.getConstantPoolPosition()) {
+            Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
+            constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
+            fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
+            constantLookups.forEach(c -> c.getLatestPool().setResolving());
+            constantLookups.forEach(c -> c.getLatestPool().resolve());
+            constantLookups.forEach(c -> c.getLatestPool().setResolved());
+        }
+        input.position(lastValid);
+        return null;
     }
 
     /**
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Fri Nov 29 17:31:01 2019 +0100
@@ -105,8 +105,8 @@
     }
 
     protected void processRecursionSafe() throws IOException {
+        Dispatcher lastDisp = null;
         Dispatcher disp = dispatcher();
-
         Path path;
         boolean validStartTime = recording != null || disp.startTime != null;
         if (validStartTime) {
@@ -125,18 +125,20 @@
             long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
 
             while (!isClosed()) {
-                boolean awaitnewEvent = false;
                 while (!isClosed() && !currentParser.isChunkFinished()) {
                     disp = dispatcher();
-                    ParserConfiguration pc = disp.parserConfiguration;
-                    pc.filterStart = filterStart;
-                    pc.filterEnd = filterEnd;
-                    currentParser.updateConfiguration(pc, true);
-                    currentParser.setFlushOperation(getFlushOperation());
-                    if (pc.isOrdered()) {
-                        awaitnewEvent = processOrdered(disp, awaitnewEvent);
+                    if (disp != lastDisp) {
+                        ParserConfiguration pc = disp.parserConfiguration;
+                        pc.filterStart = filterStart;
+                        pc.filterEnd = filterEnd;
+                        currentParser.updateConfiguration(pc, true);
+                        currentParser.setFlushOperation(getFlushOperation());
+                        lastDisp = disp;
+                    }
+                    if (disp.parserConfiguration.isOrdered()) {
+                        processOrdered(disp);
                     } else {
-                        awaitnewEvent = processUnordered(disp, awaitnewEvent);
+                        processUnordered(disp);
                     }
                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
                         close();
@@ -182,29 +184,24 @@
         return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
     }
 
-    private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
+    private void processOrdered(Dispatcher c) throws IOException {
         if (sortedCache == null) {
             sortedCache = new RecordedEvent[100_000];
         }
         int index = 0;
         while (true) {
-            RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
+            RecordedEvent e = currentParser.readStreamingEvent();
             if (e == null) {
-                // wait for new event with next call to
-                // readStreamingEvent()
-                awaitNewEvents = true;
                 break;
             }
-            awaitNewEvents = false;
             if (index == sortedCache.length) {
                 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
             }
             sortedCache[index++] = e;
         }
-
         // no events found
         if (index == 0 && currentParser.isChunkFinished()) {
-            return awaitNewEvents;
+            return;
         }
         // at least 2 events, sort them
         if (index > 1) {
@@ -213,12 +210,12 @@
         for (int i = 0; i < index; i++) {
             c.dispatch(sortedCache[i]);
         }
-        return awaitNewEvents;
+        return;
     }
 
-    private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
+    private boolean processUnordered(Dispatcher c) throws IOException {
         while (true) {
-            RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
+            RecordedEvent e = currentParser.readStreamingEvent();
             if (e == null) {
                 return true;
             } else {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java	Fri Nov 29 17:31:01 2019 +0100
@@ -121,4 +121,8 @@
     public boolean hasChanged() {
         return changed;
     }
+
+    public void setChanged(boolean changed) {
+        this.changed = changed;
+    }
 }
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Fri Nov 29 17:31:01 2019 +0100
@@ -26,9 +26,7 @@
 package jdk.jfr.api.consumer.recordingstream;
 
 import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import jdk.jfr.Event;
@@ -41,7 +39,7 @@
  * @summary Tests RecordingStream::close()
  * @key jfr
  * @requires vm.hasJFR
- * @library /test/lib
+ * @library /test/lib /test/jdk
  * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestClose
  */
 public class TestClose {
@@ -58,96 +56,82 @@
         testCloseNoEvents();
     }
 
-    private static void testCloseMySelf() throws Exception {
-        log("Entering testCloseMySelf()");
-        CountDownLatch l1 = new CountDownLatch(1);
-        CountDownLatch l2 = new CountDownLatch(1);
-        RecordingStream r = new RecordingStream();
-        r.onEvent(e -> {
-            try {
-                l1.await();
-                r.close();
-                l2.countDown();
-            } catch (InterruptedException ie) {
-                throw new Error(ie);
-            }
-        });
-        r.startAsync();
-        CloseEvent c = new CloseEvent();
-        c.commit();
-        l1.countDown();
-        l2.await();
-        log("Leaving testCloseMySelf()");
+    private static void testCloseUnstarted() {
+        System.out.println("testCloseUnstarted()");
+
+        try (RecordingStream r = new RecordingStream()) {
+            r.close();
+        }
+    }
+
+    private static void testCloseStarted() {
+        System.out.println("testCloseStarted()");
+
+        try (RecordingStream r = new RecordingStream()) {
+            r.startAsync();
+        } // <- Close
+    }
+
+    private static void testCloseTwice() {
+        System.out.println("Entering testCloseTwice()");
+
+        try (RecordingStream r = new RecordingStream()) {
+            r.startAsync();
+            r.close();
+        } // <- Second close
     }
 
     private static void testCloseStreaming() throws Exception {
-        log("Entering testCloseStreaming()");
+        System.out.println("Entering testCloseStreaming()");
+
+        EventProducer p = new EventProducer();
+        p.start();
         CountDownLatch streaming = new CountDownLatch(1);
-        RecordingStream r = new RecordingStream();
-        AtomicLong count = new AtomicLong();
-        r.onEvent(e -> {
-            if (count.incrementAndGet() > 100) {
+        try (RecordingStream r = new RecordingStream()) {
+            r.onEvent(e -> {
                 streaming.countDown();
-            }
-        });
-        r.startAsync();
-        var streamingLoop = CompletableFuture.runAsync(() -> {
-            while (true) {
-                CloseEvent c = new CloseEvent();
-                c.commit();
-            }
-        });
-        streaming.await();
-        r.close();
-        streamingLoop.cancel(true);
-        log("Leaving testCloseStreaming()");
+            });
+            r.startAsync();
+            streaming.await();
+        } // <- Close
+        p.kill();
     }
 
-    private static void testCloseStarted() {
-        log("Entering testCloseStarted()");
-        RecordingStream r = new RecordingStream();
-        r.startAsync();
-        r.close();
-        log("Leaving testCloseStarted()");
-    }
+    private static void testCloseMySelf() throws Exception {
+        System.out.println("testCloseMySelf()");
 
-    private static void testCloseUnstarted() {
-        log("Entering testCloseUnstarted()");
-        RecordingStream r = new RecordingStream();
-        r.close();
-        log("Leaving testCloseUnstarted()");
-    }
-
-    private static void testCloseTwice() {
-        log("Entering testCloseTwice()");
-        RecordingStream r = new RecordingStream();
-        r.startAsync();
-        r.close();
-        r.close();
-        log("Leaving testCloseTwice()");
+        CountDownLatch closed = new CountDownLatch(1);
+        try (RecordingStream r = new RecordingStream()) {
+            r.onEvent(e -> {
+                r.close();  // <- Close
+                closed.countDown();
+            });
+            r.startAsync();
+            CloseEvent c = new CloseEvent();
+            c.commit();
+            closed.await();
+        }
     }
 
     private static void testCloseNoEvents() throws Exception {
+        System.out.println("testCloseNoEvents()");
+
         try (Recording r = new Recording()) {
             r.start();
             CountDownLatch finished = new CountDownLatch(2);
             AtomicReference<Thread> streamingThread = new AtomicReference<>();
             try (EventStream es = EventStream.openRepository()) {
                 es.setStartTime(Instant.EPOCH);
-                es.onFlush( () -> {
+                es.onFlush(() -> {
                     streamingThread.set(Thread.currentThread());
-                    finished.countDown();;
+                    finished.countDown();
                 });
                 es.startAsync();
                 finished.await();
-            } // <- EventStream::close should terminate thread
+            } // <- Close should terminate thread
             while (streamingThread.get().isAlive()) {
                 Thread.sleep(10);
             }
         }
     }
-
-    private static void log(String msg) {
-        System.out.println(msg);
-    }
 }
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java	Fri Nov 29 15:37:13 2019 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java	Fri Nov 29 17:31:01 2019 +0100
@@ -36,7 +36,7 @@
  * @summary Tests RecordingStream::onEvent(...)
  * @key jfr
  * @requires vm.hasJFR
- * @library /test/lib
+ * @library /test/lib /test/jdk
  * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestOnEvent
  */
 public class TestOnEvent {
@@ -58,6 +58,7 @@
         testOnEvent();
         testNamedEvent();
         testTwoEventWithSameName();
+        testOnEventAfterStart();
     }
 
     private static void testOnEventNull() {
@@ -149,6 +150,29 @@
         log("Leaving testOnEvent()");
     }
 
+    private static void testOnEventAfterStart() {
+        try (RecordingStream r = new RecordingStream()) {
+            EventProducer p = new EventProducer();
+            p.start();
+            Thread addHandler = new Thread(() ->  {
+                r.onEvent(e -> {
+                    // Got event, close stream
+                    r.close();
+                });
+            });
+            r.onFlush(() ->  {
+                // Only add handler once
+                if (!"started".equals(addHandler.getName()))  {
+                    addHandler.setName("started");
+                    addHandler.start();
+                }
+            });
+            r.start();
+            p.kill();
+        }
+    }
+
+
     private static void log(String msg) {
         System.out.println(msg);
     }