# HG changeset patch # User egahlin # Date 1575045061 -3600 # Node ID 2c3578aa0bdfb7b5808cb3b89e3f172a8b0a3a14 # Parent 851a389fc54db3dadd01dcea6ca88c4c32d5d01b 8234671: JFR api/consumer/recordingstream/TestStart.java failed due to timeout at testStartTwice() Reviewed-by: mgronlun diff -r 851a389fc54d -r 2c3578aa0bdf src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Fri Nov 29 15:37:13 2019 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Fri Nov 29 17:31:01 2019 +0100 @@ -76,9 +76,10 @@ abstract public void close(); protected final Dispatcher dispatcher() { - if (configuration.hasChanged()) { + if (configuration.hasChanged()) { // quick check synchronized (configuration) { dispatcher = new Dispatcher(configuration); + configuration.setChanged(false); } } return dispatcher; diff -r 851a389fc54d -r 2c3578aa0bdf src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Fri Nov 29 15:37:13 2019 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Fri Nov 29 17:31:01 2019 +0100 @@ -190,44 +190,40 @@ * * @param awaitNewEvents wait for new data. */ - RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException { + RecordedEvent readStreamingEvent() throws IOException { long absoluteChunkEnd = chunkHeader.getEnd(); - while (true) { - RecordedEvent event = readEvent(); - if (event != null) { - return event; - } - if (!awaitNewEvents) { - return null; - } - long lastValid = absoluteChunkEnd; - long metadataPoistion = chunkHeader.getMetataPosition(); - long contantPosition = chunkHeader.getConstantPoolPosition(); - chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd); - if (chunkFinished) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); - return null; - } - absoluteChunkEnd = chunkHeader.getEnd(); - // Read metadata and constant pools for the next segment - if (chunkHeader.getMetataPosition() != metadataPoistion) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); - MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); - ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); - parsers = factory.getParsers(); - typeMap = factory.getTypeMap(); - updateConfiguration();; - } - if (contantPosition != chunkHeader.getConstantPoolPosition()) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); - constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); - fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); - constantLookups.forEach(c -> c.getLatestPool().setResolving()); - constantLookups.forEach(c -> c.getLatestPool().resolve()); - constantLookups.forEach(c -> c.getLatestPool().setResolved()); - } - input.position(lastValid); + RecordedEvent event = readEvent(); + if (event != null) { + return event; + } + long lastValid = absoluteChunkEnd; + long metadataPosition = chunkHeader.getMetataPosition(); + long contantPosition = chunkHeader.getConstantPoolPosition(); + chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd); + if (chunkFinished) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); + return null; } + absoluteChunkEnd = chunkHeader.getEnd(); + // Read metadata and constant pools for the next segment + if (chunkHeader.getMetataPosition() != metadataPosition) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); + MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + updateConfiguration(); + } + if (contantPosition != chunkHeader.getConstantPoolPosition()) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); + constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); + fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); + } + input.position(lastValid); + return null; } /** diff -r 851a389fc54d -r 2c3578aa0bdf src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 29 15:37:13 2019 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Fri Nov 29 17:31:01 2019 +0100 @@ -105,8 +105,8 @@ } protected void processRecursionSafe() throws IOException { + Dispatcher lastDisp = null; Dispatcher disp = dispatcher(); - Path path; boolean validStartTime = recording != null || disp.startTime != null; if (validStartTime) { @@ -125,18 +125,20 @@ long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; while (!isClosed()) { - boolean awaitnewEvent = false; while (!isClosed() && !currentParser.isChunkFinished()) { disp = dispatcher(); - ParserConfiguration pc = disp.parserConfiguration; - pc.filterStart = filterStart; - pc.filterEnd = filterEnd; - currentParser.updateConfiguration(pc, true); - currentParser.setFlushOperation(getFlushOperation()); - if (pc.isOrdered()) { - awaitnewEvent = processOrdered(disp, awaitnewEvent); + if (disp != lastDisp) { + ParserConfiguration pc = disp.parserConfiguration; + pc.filterStart = filterStart; + pc.filterEnd = filterEnd; + currentParser.updateConfiguration(pc, true); + currentParser.setFlushOperation(getFlushOperation()); + lastDisp = disp; + } + if (disp.parserConfiguration.isOrdered()) { + processOrdered(disp); } else { - awaitnewEvent = processUnordered(disp, awaitnewEvent); + processUnordered(disp); } if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { close(); @@ -182,29 +184,24 @@ return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); } - private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { + private void processOrdered(Dispatcher c) throws IOException { if (sortedCache == null) { sortedCache = new RecordedEvent[100_000]; } int index = 0; while (true) { - RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); + RecordedEvent e = currentParser.readStreamingEvent(); if (e == null) { - // wait for new event with next call to - // readStreamingEvent() - awaitNewEvents = true; break; } - awaitNewEvents = false; if (index == sortedCache.length) { sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2); } sortedCache[index++] = e; } - // no events found if (index == 0 && currentParser.isChunkFinished()) { - return awaitNewEvents; + return; } // at least 2 events, sort them if (index > 1) { @@ -213,12 +210,12 @@ for (int i = 0; i < index; i++) { c.dispatch(sortedCache[i]); } - return awaitNewEvents; + return; } - private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException { + private boolean processUnordered(Dispatcher c) throws IOException { while (true) { - RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); + RecordedEvent e = currentParser.readStreamingEvent(); if (e == null) { return true; } else { diff -r 851a389fc54d -r 2c3578aa0bdf src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java Fri Nov 29 15:37:13 2019 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java Fri Nov 29 17:31:01 2019 +0100 @@ -121,4 +121,8 @@ public boolean hasChanged() { return changed; } + + public void setChanged(boolean changed) { + this.changed = changed; + } } diff -r 851a389fc54d -r 2c3578aa0bdf test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java --- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Fri Nov 29 15:37:13 2019 +0000 +++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Fri Nov 29 17:31:01 2019 +0100 @@ -26,9 +26,7 @@ package jdk.jfr.api.consumer.recordingstream; import java.time.Instant; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import jdk.jfr.Event; @@ -41,7 +39,7 @@ * @summary Tests RecordingStream::close() * @key jfr * @requires vm.hasJFR - * @library /test/lib + * @library /test/lib /test/jdk * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestClose */ public class TestClose { @@ -58,96 +56,82 @@ testCloseNoEvents(); } - private static void testCloseMySelf() throws Exception { - log("Entering testCloseMySelf()"); - CountDownLatch l1 = new CountDownLatch(1); - CountDownLatch l2 = new CountDownLatch(1); - RecordingStream r = new RecordingStream(); - r.onEvent(e -> { - try { - l1.await(); - r.close(); - l2.countDown(); - } catch (InterruptedException ie) { - throw new Error(ie); - } - }); - r.startAsync(); - CloseEvent c = new CloseEvent(); - c.commit(); - l1.countDown(); - l2.await(); - log("Leaving testCloseMySelf()"); + private static void testCloseUnstarted() { + System.out.println("testCloseUnstarted()"); + + try (RecordingStream r = new RecordingStream()) { + r.close(); + } + } + + private static void testCloseStarted() { + System.out.println("testCloseStarted()"); + + try (RecordingStream r = new RecordingStream()) { + r.startAsync(); + } // <- Close + } + + private static void testCloseTwice() { + System.out.println("Entering testCloseTwice()"); + + try (RecordingStream r = new RecordingStream()) { + r.startAsync(); + r.close(); + } // <- Second close } private static void testCloseStreaming() throws Exception { - log("Entering testCloseStreaming()"); + System.out.println("Entering testCloseStreaming()"); + + EventProducer p = new EventProducer(); + p.start(); CountDownLatch streaming = new CountDownLatch(1); - RecordingStream r = new RecordingStream(); - AtomicLong count = new AtomicLong(); - r.onEvent(e -> { - if (count.incrementAndGet() > 100) { + try (RecordingStream r = new RecordingStream()) { + r.onEvent(e -> { streaming.countDown(); - } - }); - r.startAsync(); - var streamingLoop = CompletableFuture.runAsync(() -> { - while (true) { - CloseEvent c = new CloseEvent(); - c.commit(); - } - }); - streaming.await(); - r.close(); - streamingLoop.cancel(true); - log("Leaving testCloseStreaming()"); + }); + r.startAsync(); + streaming.await(); + } // <- Close + p.kill(); } - private static void testCloseStarted() { - log("Entering testCloseStarted()"); - RecordingStream r = new RecordingStream(); - r.startAsync(); - r.close(); - log("Leaving testCloseStarted()"); - } + private static void testCloseMySelf() throws Exception { + System.out.println("testCloseMySelf()"); - private static void testCloseUnstarted() { - log("Entering testCloseUnstarted()"); - RecordingStream r = new RecordingStream(); - r.close(); - log("Leaving testCloseUnstarted()"); - } - - private static void testCloseTwice() { - log("Entering testCloseTwice()"); - RecordingStream r = new RecordingStream(); - r.startAsync(); - r.close(); - r.close(); - log("Leaving testCloseTwice()"); + CountDownLatch closed = new CountDownLatch(1); + try (RecordingStream r = new RecordingStream()) { + r.onEvent(e -> { + r.close(); // <- Close + closed.countDown(); + }); + r.startAsync(); + CloseEvent c = new CloseEvent(); + c.commit(); + closed.await(); + } } private static void testCloseNoEvents() throws Exception { + System.out.println("testCloseNoEvents()"); + try (Recording r = new Recording()) { r.start(); CountDownLatch finished = new CountDownLatch(2); AtomicReference streamingThread = new AtomicReference<>(); try (EventStream es = EventStream.openRepository()) { es.setStartTime(Instant.EPOCH); - es.onFlush( () -> { + es.onFlush(() -> { streamingThread.set(Thread.currentThread()); - finished.countDown();; + finished.countDown(); }); es.startAsync(); finished.await(); - } // <- EventStream::close should terminate thread + } // <- Close should terminate thread while (streamingThread.get().isAlive()) { Thread.sleep(10); } } } - - private static void log(String msg) { - System.out.println(msg); - } } diff -r 851a389fc54d -r 2c3578aa0bdf test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java --- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java Fri Nov 29 15:37:13 2019 +0000 +++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java Fri Nov 29 17:31:01 2019 +0100 @@ -36,7 +36,7 @@ * @summary Tests RecordingStream::onEvent(...) * @key jfr * @requires vm.hasJFR - * @library /test/lib + * @library /test/lib /test/jdk * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestOnEvent */ public class TestOnEvent { @@ -58,6 +58,7 @@ testOnEvent(); testNamedEvent(); testTwoEventWithSameName(); + testOnEventAfterStart(); } private static void testOnEventNull() { @@ -149,6 +150,29 @@ log("Leaving testOnEvent()"); } + private static void testOnEventAfterStart() { + try (RecordingStream r = new RecordingStream()) { + EventProducer p = new EventProducer(); + p.start(); + Thread addHandler = new Thread(() -> { + r.onEvent(e -> { + // Got event, close stream + r.close(); + }); + }); + r.onFlush(() -> { + // Only add handler once + if (!"started".equals(addHandler.getName())) { + addHandler.setName("started"); + addHandler.start(); + } + }); + r.start(); + p.kill(); + } + } + + private static void log(String msg) { System.out.println(msg); }