# HG changeset patch # User egahlin # Date 1574955505 -3600 # Node ID 72f3dd43dd280155302032c5239680328841cc97 # Parent be238525d240d89413e83abfa7a12733b6e82a72 8234888: EventStream::close doesn't abort streaming thread Reviewed-by: mgronlun, mseledtsov diff -r be238525d240 -r 72f3dd43dd28 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Thu Nov 28 16:28:53 2019 +0100 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Thu Nov 28 16:38:25 2019 +0100 @@ -48,7 +48,7 @@ * an event stream. */ abstract class AbstractEventStream implements EventStream { - private final static AtomicLong counter = new AtomicLong(1); + private final static AtomicLong counter = new AtomicLong(0); private final Object terminated = new Object(); private final Runnable flushOperation = () -> dispatcher().runFlushActions(); diff -r be238525d240 -r 72f3dd43dd28 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Thu Nov 28 16:28:53 2019 +0100 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Thu Nov 28 16:38:25 2019 +0100 @@ -106,6 +106,7 @@ private Runnable flushOperation; private ParserConfiguration configuration; + private volatile boolean closed; public ChunkParser(RecordingInput input) throws IOException { this(input, new ParserConfiguration()); @@ -284,6 +285,9 @@ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); } while (true) { + if (closed) { + return true; + } if (chunkHeader.getLastNanos() > filterEnd) { return true; } @@ -455,4 +459,9 @@ return chunkHeader.isFinalChunk(); } + public void close() { + this.closed = true; + Utils.notifyFlush(); + } + } diff -r be238525d240 -r 72f3dd43dd28 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Thu Nov 28 16:28:53 2019 +0100 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Thu Nov 28 16:38:25 2019 +0100 @@ -69,6 +69,9 @@ setClosed(true); dispatcher().runCloseActions(); repositoryFiles.close(); + if (currentParser != null) { + currentParser.close(); + } } @Override diff -r be238525d240 -r 72f3dd43dd28 test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java --- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Nov 28 16:28:53 2019 +0100 +++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Nov 28 16:38:25 2019 +0100 @@ -25,11 +25,15 @@ package jdk.jfr.api.consumer.recordingstream; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import jdk.jfr.Event; +import jdk.jfr.Recording; +import jdk.jfr.consumer.EventStream; import jdk.jfr.consumer.RecordingStream; /** @@ -51,6 +55,7 @@ testCloseTwice(); testCloseStreaming(); testCloseMySelf(); + testCloseNoEvents(); } private static void testCloseMySelf() throws Exception { @@ -122,6 +127,26 @@ log("Leaving testCloseTwice()"); } + private static void testCloseNoEvents() throws Exception { + try (Recording r = new Recording()) { + r.start(); + CountDownLatch finished = new CountDownLatch(2); + AtomicReference streamingThread = new AtomicReference<>(); + try (EventStream es = EventStream.openRepository()) { + es.setStartTime(Instant.EPOCH); + es.onFlush( () -> { + streamingThread.set(Thread.currentThread()); + finished.countDown();; + }); + es.startAsync(); + finished.await(); + } // <- EventStream::close should terminate thread + while (streamingThread.get().isAlive()) { + Thread.sleep(10); + } + } + } + private static void log(String msg) { System.out.println(msg); }