--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue Jun 25 12:03:01 2019 +0200
@@ -66,20 +66,20 @@
this.repostory = repostory;
}
- Path nextPath(Path previous) {
- long startTimeNanos = 0L;
- if (previous != null) {
- startTimeNanos = pathLookup.get(previous);
- }
- SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
- if (!after.isEmpty()) {
- Path path = after.get(after.firstKey());
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
- return path;
- }
+ long getTimestamp(Path p) {
+ return pathLookup.get(p);
+ }
+
+ Path nextPath(long startTimeNanos) {
while (!closed) {
+ SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+ if (!after.isEmpty()) {
+ Path path = after.get(after.firstKey());
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+ return path;
+ }
try {
- if (updatePaths()) {
+ if (updatePaths(repostory)) {
continue;
}
} catch (IOException e) {
@@ -90,7 +90,9 @@
// and retry later.
}
try {
- pathSet.wait(1000);
+ synchronized (pathSet) {
+ pathSet.wait(1000);
+ }
} catch (InterruptedException e) {
// ignore
}
@@ -98,15 +100,18 @@
return null;
}
- private boolean updatePaths() throws IOException {
+ private boolean updatePaths(Path repo) throws IOException {
+ if (repo == null) {
+ repo = Repository.getRepository().getRepositoryPath().toPath();
+ }
boolean foundNew = false;
List<Path> added = new ArrayList<>();
Set<Path> current = new HashSet<>();
- if (!Files.exists(repostory)) {
+ if (!Files.exists(repo)) {
// Repository removed, probably due to shutdown
return true;
}
- try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repostory, "*.jfr")) {
+ try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
for (Path p : dirStream) {
if (!pathLookup.containsKey(p)) {
added.add(p);
@@ -150,8 +155,8 @@
}
public void close() {
- this.closed = true;
synchronized (pathSet) {
+ this.closed = true;
pathSet.notify();
}
}
@@ -165,7 +170,7 @@
private ChunkParser chunkParser;
private boolean reuse = true;
private RecordedEvent[] sortedList;
- private boolean ordered;
+ private boolean ordered = true;
public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
super(acc);
@@ -174,96 +179,78 @@
@Override
public void process() throws IOException {
- Path path = repositoryFiles.nextPath(null);
+ Path path = repositoryFiles.nextPath(startNanos);
+ startNanos = repositoryFiles.getTimestamp(path) + 1;
try (RecordingInput input = new RecordingInput(path.toFile())) {
- chunkParser = new ChunkParser(input, reuse);
while (!isClosed()) {
- boolean reuse = this.reuse;
- boolean ordered = this.ordered;
- chunkParser.setReuse(reuse);
- chunkParser.setOrdered(ordered);
- chunkParser.resetEventCache();
- chunkParser.updateEventParsers();
+ // chunkParser = chunkParser.nextChunkParser();
+ chunkParser = new ChunkParser(input, this.reuse);
+ boolean awaitnewEvent = false;
+ while (!isClosed() && !chunkParser.isChunkFinished()) {
+ chunkParser.setReuse(this.reuse);
+ chunkParser.setOrdered(this.ordered);
+ chunkParser.resetEventCache();
+ chunkParser.updateEventParsers();
+ if (ordered) {
+ awaitnewEvent = processOrdered(awaitnewEvent);
+ } else {
+ awaitnewEvent = processUnordered(awaitnewEvent);
+ }
+ runFlushActions();
+ }
- if (ordered) {
- processOrdered2();
- } else {
- processUnordered();
- }
- if (chunkParser.isLastChunk()) {
- return;
- }
- path = repositoryFiles.nextPath(path);
- input.newFile(path);
+ path = repositoryFiles.nextPath(startNanos);
+ startNanos = repositoryFiles.getTimestamp(path) + 1;
+ input.setFile(path);
}
}
}
- private void processOrdered2() throws IOException {
+ private boolean processOrdered(boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
}
+ int index = 0;
while (true) {
- boolean reuse = this.reuse;
- boolean ordered = this.ordered;
- chunkParser.setReuse(reuse);
- chunkParser.setOrdered(ordered);
- chunkParser.resetEventCache();
- chunkParser.updateEventParsers();
- boolean awaitNewEvents = false;
- int index = 0;
- while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
- if (e == null) {
- // wait for new event with next call to
- // readStreamingEvent()
- awaitNewEvents = true;
- break;
- }
- awaitNewEvents = false;
- if (index == sortedList.length) {
- sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
- }
- sortedList[index++] = e;
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ // wait for new event with next call to
+ // readStreamingEvent()
+ awaitNewEvents = true;
+ break;
+ }
+ awaitNewEvents = false;
+ if (index == sortedList.length) {
+ sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
}
+ sortedList[index++] = e;
+ }
- // no events found
- if (index == 0 && chunkParser.isChunkFinished()) {
- return;
- }
- // at least 2 events, sort them
- if (index > 1) {
- Arrays.sort(sortedList, 0, index, END_TIME);
- }
- for (int i = 0; i < index; i++) {
- dispatch(sortedList[i]);
- }
- if (chunkParser.isChunkFinished()) {
- return;
- }
- runFlushActions();
+ // no events found
+ if (index == 0 && chunkParser.isChunkFinished()) {
+ return awaitNewEvents;
}
+ // at least 2 events, sort them
+ if (index > 1) {
+ Arrays.sort(sortedList, 0, index, END_TIME);
+ }
+ for (int i = 0; i < index; i++) {
+ dispatch(sortedList[i]);
+ }
+ return awaitNewEvents;
}
- private void processUnordered() throws IOException {
- boolean awaitNewEvents = false;
+ private boolean processUnordered(boolean awaitNewEvents) throws IOException {
while (true) {
- boolean reuse = this.reuse;
- boolean ordered = this.ordered;
- chunkParser.setReuse(reuse);
- chunkParser.setOrdered(ordered);
- chunkParser.resetEventCache();
- chunkParser.updateEventParsers();
- while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
- if (e == null) {
- awaitNewEvents = true;
- break;
- }
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ awaitNewEvents = true;
+ break;
+ } else {
dispatch(e);
}
- runFlushActions();
}
+ return awaitNewEvents;
}
public void setReuse(boolean reuse) {
@@ -354,9 +341,8 @@
private final EventConsumer eventConsumer;
- public EventDirectoryStream(AccessControlContext acc) throws IOException {
- // Path p = Repository.getRepository().getRepositoryPath().toPath();
- eventConsumer = new SharedParserConsumer(acc);
+ public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
+ eventConsumer = new ParserConsumer(acc, p);
}
public void close() {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Tue Jun 25 12:03:01 2019 +0200
@@ -81,7 +81,7 @@
this.recording = new Recording();
this.recording.setFlushInterval(Duration.ofMillis(1000));
try {
- this.stream = new EventDirectoryStream(acc);
+ this.stream = new EventDirectoryStream(acc, null);
} catch (IOException ioe) {
throw new IllegalStateException(ioe.getMessage());
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Mon Jun 03 16:21:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Tue Jun 25 12:03:01 2019 +0200
@@ -65,8 +65,8 @@
}
}
- private final RandomAccessFile file;
- private final String filename;
+ private RandomAccessFile file;
+ private String filename;
private Block currentBlock = new Block();
private Block previousBlock = new Block();
private long position;
@@ -75,8 +75,16 @@
public RecordingInput(File f, int blockSize) throws IOException {
this.blockSize = blockSize;
+ initialize(f);
+ }
+
+ private void initialize(File f) throws IOException {
this.filename = f.getAbsolutePath().toString();
this.file = new RandomAccessFile(f, "r");
+ this.position = 0;
+ this.size = -1;
+ this.currentBlock.reset();
+ this.previousBlock.reset();
if (f.length() < 8) {
throw new IOException("Not a valid Flight Recorder file. File length is only " + f.length() + " bytes.");
}
@@ -335,19 +343,14 @@
// Purpose of this method is to reuse block cache from a
// previous RecordingInput
- public RecordingInput newFile(Path path) throws IOException {
+ public void setFile(Path path) throws IOException {
try {
- close();
+ file.close();
} catch (IOException e) {
// perhaps deleted
}
- RecordingInput input = new RecordingInput(path.toFile(), this.blockSize);
- input.currentBlock = this.currentBlock;
- input.currentBlock.reset();
- input.previousBlock = this.previousBlock;
- input.previousBlock.reset();
-
- return input;
+ file = null;
+ initialize(path.toFile());
}
}