8234888: EventStream::close doesn't abort streaming thread
Reviewed-by: mgronlun, mseledtsov
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Thu Nov 28 16:38:25 2019 +0100
@@ -48,7 +48,7 @@
* an event stream.
*/
abstract class AbstractEventStream implements EventStream {
- private final static AtomicLong counter = new AtomicLong(1);
+ private final static AtomicLong counter = new AtomicLong(0);
private final Object terminated = new Object();
private final Runnable flushOperation = () -> dispatcher().runFlushActions();
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Thu Nov 28 16:38:25 2019 +0100
@@ -106,6 +106,7 @@
private Runnable flushOperation;
private ParserConfiguration configuration;
+ private volatile boolean closed;
public ChunkParser(RecordingInput input) throws IOException {
this(input, new ParserConfiguration());
@@ -284,6 +285,9 @@
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
}
while (true) {
+ if (closed) {
+ return true;
+ }
if (chunkHeader.getLastNanos() > filterEnd) {
return true;
}
@@ -455,4 +459,9 @@
return chunkHeader.isFinalChunk();
}
+ public void close() {
+ this.closed = true;
+ Utils.notifyFlush();
+ }
+
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Thu Nov 28 16:38:25 2019 +0100
@@ -69,6 +69,9 @@
setClosed(true);
dispatcher().runCloseActions();
repositoryFiles.close();
+ if (currentParser != null) {
+ currentParser.close();
+ }
}
@Override
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Nov 28 16:28:53 2019 +0100
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Nov 28 16:38:25 2019 +0100
@@ -25,11 +25,15 @@
package jdk.jfr.api.consumer.recordingstream;
+import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import jdk.jfr.Event;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordingStream;
/**
@@ -51,6 +55,7 @@
testCloseTwice();
testCloseStreaming();
testCloseMySelf();
+ testCloseNoEvents();
}
private static void testCloseMySelf() throws Exception {
@@ -122,6 +127,26 @@
log("Leaving testCloseTwice()");
}
+ private static void testCloseNoEvents() throws Exception {
+ try (Recording r = new Recording()) {
+ r.start();
+ CountDownLatch finished = new CountDownLatch(2);
+ AtomicReference<Thread> streamingThread = new AtomicReference<>();
+ try (EventStream es = EventStream.openRepository()) {
+ es.setStartTime(Instant.EPOCH);
+ es.onFlush( () -> {
+ streamingThread.set(Thread.currentThread());
+ finished.countDown();;
+ });
+ es.startAsync();
+ finished.await();
+ } // <- EventStream::close should terminate thread
+ while (streamingThread.get().isAlive()) {
+ Thread.sleep(10);
+ }
+ }
+ }
+
private static void log(String msg) {
System.out.println(msg);
}