--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Wed Jun 26 16:04:47 2019 +0200
@@ -71,7 +71,7 @@
}
public ChunkParser(ChunkParser previous) throws IOException {
- this(new ChunkHeader(previous.input), null, 500);
+ this(new ChunkHeader(previous.input), previous, 500);
}
private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jun 26 16:04:47 2019 +0200
@@ -54,6 +54,11 @@
import jdk.jfr.internal.consumer.EventConsumer;
import jdk.jfr.internal.consumer.RecordingInput;
+/**
+ * Implementation of an {@code EventStream}} that operates against a directory
+ * with chunk files.
+ *
+ */
final class EventDirectoryStream implements EventStream {
private static final class RepositoryFiles {
@@ -67,7 +72,7 @@
}
long getTimestamp(Path p) {
- return pathLookup.get(p);
+ return pathLookup.get(p);
}
Path nextPath(long startTimeNanos) {
@@ -168,9 +173,7 @@
private static final int DEFAULT_ARRAY_SIZE = 10_000;
private final RepositoryFiles repositoryFiles;
private ChunkParser chunkParser;
- private boolean reuse = true;
private RecordedEvent[] sortedList;
- private boolean ordered = true;
public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
super(acc);
@@ -200,6 +203,9 @@
}
path = repositoryFiles.nextPath(startNanos);
+ if (path == null) {
+ return; // stream closed
+ }
startNanos = repositoryFiles.getTimestamp(path) + 1;
input.setFile(path);
chunkParser = chunkParser.newChunkParser();
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jun 26 16:04:47 2019 +0200
@@ -53,7 +53,6 @@
private RecordedEvent[] sortedList;
private boolean ordered;
-
public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
super(acc);
this.input = input;
@@ -68,8 +67,8 @@
chunkParser.setReuse(reuse);
chunkParser.setOrdered(ordered);
chunkParser.resetEventCache();
+ chunkParser.setParserFilter(eventFilter);
chunkParser.updateEventParsers();
-
if (ordered) {
processOrdered();
} else {
@@ -78,6 +77,7 @@
if (chunkParser.isLastChunk()) {
return;
}
+ runFlushActions();
chunkParser = chunkParser.nextChunkParser();
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Wed Jun 26 16:04:47 2019 +0200
@@ -331,7 +331,7 @@
@Override
public void setReuse(boolean reuse) {
- throw new UnsupportedOperationException("Unordered not implemented yet");
+ stream.setReuse(reuse);
}
@Override
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Wed Jun 26 16:04:47 2019 +0200
@@ -78,6 +78,8 @@
private final static VarHandle dispatcherHandle;
private final static VarHandle flushActionsHandle;
private final static VarHandle closeActionsHandle;
+ private final static VarHandle orderedHandle;
+ private final static VarHandle reuseHandle;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
@@ -86,6 +88,9 @@
dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
+ orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class);
+ reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class);
+
} catch (ReflectiveOperationException e) {
throw new InternalError(e);
}
@@ -101,6 +106,10 @@
// set by VarHandle
private Runnable[] closeActions = new Runnable[0];
+ protected boolean ordered = true;
+
+ protected boolean reuse = true;
+
protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
private final AccessControlContext accessControlContext;
@@ -329,11 +338,11 @@
abstract public void close();
public void setReuse(boolean reuse) {
-
+ reuseHandle.setVolatile(this, reuse);
}
public void setOrdered(boolean ordered) {
-
+ orderedHandle.setVolatile(this, ordered);
}
}
\ No newline at end of file
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Tue Jun 25 14:02:06 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Wed Jun 26 16:04:47 2019 +0200
@@ -92,7 +92,7 @@
es.onEvent(e -> {
Instant endTime = e.getEndTime();
if (endTime.isBefore(timestamp.get())) {
- throw new Error("Events are not ordered! Reues = " + reuse);
+ throw new Error("Events are not ordered! Reuse = " + reuse);
}
timestamp.set(endTime);
});
@@ -118,7 +118,7 @@
});
es.start();
if (!unoreded.get()) {
- throw new Exception("Expected at least some events to be out of order! Reues = " + reuse);
+ throw new Exception("Expected at least some events to be out of order! Reuse = " + reuse);
}
}
}