--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Sat Oct 26 23:59:51 2019 +0200
@@ -258,7 +258,7 @@
}
} else {
if (typeId != 0) { // Not metadata event
- Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Unknwon event type " + typeId);
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Unknown event type " + typeId);
}
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java Sat Oct 26 23:59:51 2019 +0200
@@ -144,6 +144,11 @@
return;
}
long durationNanos = currentParser.getChunkDuration();
+ if (durationNanos == 0) {
+ // Avoid reading the same chunk again and again if
+ // duration is 0 ns
+ durationNanos++;
+ }
path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
if (path == null) {
return; // stream closed
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Thu Oct 24 07:02:36 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Sat Oct 26 23:59:51 2019 +0200
@@ -109,6 +109,24 @@
}
Path nextPath(long startTimeNanos) {
+ if (closed) {
+ return null;
+ }
+ // Try to get the 'exact' path first
+ // to avoid skipping files if repository
+ // is updated while DirectoryStream
+ // is traversing it
+ Path path = pathSet.get(startTimeNanos);
+ if (path != null) {
+ return path;
+ }
+ // Update paths
+ try {
+ updatePaths();
+ } catch (IOException e) {
+ // ignore
+ }
+ // try to get the next file
return path(startTimeNanos);
}
--- a/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Thu Oct 24 07:02:36 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Sat Oct 26 23:59:51 2019 +0200
@@ -25,10 +25,14 @@
package jdk.jfr.api.consumer.streaming;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import jdk.jfr.Event;
+import jdk.jfr.FlightRecorder;
import jdk.jfr.Name;
import jdk.jfr.Recording;
import jdk.jfr.consumer.EventStream;
@@ -36,7 +40,7 @@
/**
* @test
- * @summary Verifies that a stream from a repository starts at the latest event
+ * @summary Verifies that EventStream::openRepository() read from the latest flush
* @key jfr
* @requires vm.hasJFR
* @library /test/lib
@@ -46,6 +50,8 @@
@Name("NotLatest")
static class NotLatestEvent extends Event {
+
+ public int id;
}
@Name("Latest")
@@ -57,15 +63,23 @@
}
public static void main(String... args) throws Exception {
+ CountDownLatch notLatestEvent = new CountDownLatch(6);
+ CountDownLatch beginChunks = new CountDownLatch(1);
+
try (RecordingStream r = new RecordingStream()) {
+ r.onEvent("MakeChunks", event-> {
+ System.out.println(event);
+ beginChunks.countDown();
+ });
+ r.onEvent("NotLatest", event -> {
+ System.out.println(event);
+ notLatestEvent.countDown();
+ });
r.startAsync();
MakeChunks e = new MakeChunks();
e.commit();
- CountDownLatch beginChunks = new CountDownLatch(1);
- r.onEvent("MakeChunks", event-> {
- beginChunks.countDown();
- });
- System.out.println("Waitning for first chunk");
+
+ System.out.println("Waiting for first chunk");
beginChunks.await();
// Create 5 chunks with events in the repository
for (int i = 0; i < 5; i++) {
@@ -73,52 +87,49 @@
try (Recording r1 = new Recording()) {
r1.start();
NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.id = i;
notLatest.commit();
r1.stop();
}
}
System.out.println("All empty chunks created");
- // Create an event in a segment, typically the first.
+
+ // Create event in new chunk
NotLatestEvent notLatest = new NotLatestEvent();
+ notLatest.id = 5;
notLatest.commit();
+
+ // This latch ensures thatNotLatest has been
+ // flushed and a new valid position has been written
+ // to the chunk header
+ notLatestEvent.await(80, TimeUnit.SECONDS);
+ if (notLatestEvent.getCount() != 0) {
+ Recording rec = FlightRecorder.getFlightRecorder().takeSnapshot();
+ Path p = Paths.get("c:\\testlatest\\not-latest.jfr");
+ rec.dump(p);
+ System.out.println("dumped " + p);
+ rec.close();
+ }
+
try (EventStream s = EventStream.openRepository()) {
- // Wait for next segment
- // to prevent flush to prevent NotLatest to be included
- awaitFlush(r);
System.out.println("EventStream opened");
AtomicBoolean foundLatest = new AtomicBoolean();
s.onEvent(event -> {
String name = event.getEventType().getName();
System.out.println("Found event " + name);
foundLatest.set(name.equals("Latest"));
- s.close();
});
- System.out.println("Added onEvent handler");
s.startAsync();
- // wait for next segment
- awaitFlush(s);
- // Emit the latest event
- LatestEvent latest = new LatestEvent();
- latest.commit();
- System.out.println("Latest event emitted");
- System.out.println("Waiting for termination");
- s.awaitTermination();
- if (!foundLatest.get()) {
- throw new Exception("Didn't find latest event!");
+ // Must loop here as there is no guarantee
+ // that the parser thread starts before event
+ // is flushed
+ while (!foundLatest.get()) {
+ LatestEvent latest = new LatestEvent();
+ latest.commit();
+ System.out.println("Latest event emitted. Waiting 1 s ...");
+ Thread.sleep(1000);
}
}
}
}
-
- private static void awaitFlush(EventStream stream) throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- System.out.println("Waiting for flush...");
- final Runnable l = () -> {
- System.out.println("Flush arrived!");
- latch.countDown();
- };
- stream.onFlush(l);
- latch.await();
- stream.remove(l);
- }
}