8234888: EventStream::close doesn't abort streaming thread
authoregahlin
Thu, 28 Nov 2019 16:38:25 +0100
changeset 59310 72f3dd43dd28
parent 59309 be238525d240
child 59311 b42eaca7d234
8234888: EventStream::close doesn't abort streaming thread Reviewed-by: mgronlun, mseledtsov
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java	Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java	Thu Nov 28 16:38:25 2019 +0100
@@ -48,7 +48,7 @@
  * an event stream.
  */
 abstract class AbstractEventStream implements EventStream {
-    private final static AtomicLong counter = new AtomicLong(1);
+    private final static AtomicLong counter = new AtomicLong(0);
 
     private final Object terminated = new Object();
     private final Runnable flushOperation = () -> dispatcher().runFlushActions();
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java	Thu Nov 28 16:38:25 2019 +0100
@@ -106,6 +106,7 @@
 
     private Runnable flushOperation;
     private ParserConfiguration configuration;
+    private volatile boolean closed;
 
     public ChunkParser(RecordingInput input) throws IOException {
         this(input, new ParserConfiguration());
@@ -284,6 +285,9 @@
             Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
         }
         while (true) {
+            if (closed) {
+                return true;
+            }
             if (chunkHeader.getLastNanos() > filterEnd)  {
               return true;
             }
@@ -455,4 +459,9 @@
         return chunkHeader.isFinalChunk();
     }
 
+    public void close() {
+        this.closed = true;
+        Utils.notifyFlush();
+    }
+
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Thu Nov 28 16:28:53 2019 +0100
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java	Thu Nov 28 16:38:25 2019 +0100
@@ -69,6 +69,9 @@
         setClosed(true);
         dispatcher().runCloseActions();
         repositoryFiles.close();
+        if (currentParser != null) {
+            currentParser.close();
+        }
     }
 
     @Override
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Thu Nov 28 16:28:53 2019 +0100
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Thu Nov 28 16:38:25 2019 +0100
@@ -25,11 +25,15 @@
 
 package jdk.jfr.api.consumer.recordingstream;
 
+import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import jdk.jfr.Event;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
 import jdk.jfr.consumer.RecordingStream;
 
 /**
@@ -51,6 +55,7 @@
         testCloseTwice();
         testCloseStreaming();
         testCloseMySelf();
+        testCloseNoEvents();
     }
 
     private static void testCloseMySelf() throws Exception {
@@ -122,6 +127,26 @@
         log("Leaving testCloseTwice()");
     }
 
+    private static void testCloseNoEvents() throws Exception {
+        try (Recording r = new Recording()) {
+            r.start();
+            CountDownLatch finished = new CountDownLatch(2);
+            AtomicReference<Thread> streamingThread = new AtomicReference<>();
+            try (EventStream es = EventStream.openRepository()) {
+                es.setStartTime(Instant.EPOCH);
+                es.onFlush( () -> {
+                    streamingThread.set(Thread.currentThread());
+                    finished.countDown();;
+                });
+                es.startAsync();
+                finished.await();
+            } // <- EventStream::close should terminate thread
+            while (streamingThread.get().isAlive()) {
+                Thread.sleep(10);
+            }
+        }
+    }
+
     private static void log(String msg) {
         System.out.println(msg);
     }