--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -45,43 +45,52 @@
*/
final class EventDirectoryStream implements EventStream {
- static final class DirectoryConsumer extends EventConsumer {
+ static final class DirectoryStream extends AbstractEventStream {
private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
private static final int DEFAULT_ARRAY_SIZE = 10_000;
+
private final RepositoryFiles repositoryFiles;
+
private ChunkParser chunkParser;
private RecordedEvent[] sortedList;
protected long chunkStartNanos;
- public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
+ public DirectoryStream(AccessControlContext acc, Path p) throws IOException {
super(acc);
repositoryFiles = new RepositoryFiles(p);
}
@Override
public void process() throws IOException {
- chunkStartNanos = startNanos;
+ StreamConfiguration c1 = configuration;
+ chunkStartNanos = c1.getStartNanos();
Path path;
- if (startTime == EventConsumer.NEXT_EVENT) {
+ if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
// TODO: Need to skip forward to the next event
// For now, use the last chunk.
path = repositoryFiles.lastPath();
} else {
path = repositoryFiles.nextPath(chunkStartNanos);
}
+ if (path == null) { // closed
+ return;
+ }
chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
try (RecordingInput input = new RecordingInput(path.toFile())) {
- chunkParser = new ChunkParser(input, this.reuse);
+ chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !chunkParser.isChunkFinished()) {
- chunkParser.setReuse(this.reuse);
- chunkParser.setOrdered(this.ordered);
- chunkParser.setFirstNanos(startNanos);
+ final StreamConfiguration c2 = configuration;
+ boolean ordered = c2.getOrdered();
+ chunkParser.setReuse(c2.getReuse());
+ chunkParser.setOrdered(ordered);
+ chunkParser.setFirstNanos(c2.getStartNanos());
chunkParser.resetEventCache();
- chunkParser.setParserFilter(this.eventFilter);
+ chunkParser.setParserFilter(c2.getFilter());
chunkParser.updateEventParsers();
+ clearLastDispatch();
if (ordered) {
awaitnewEvent = processOrdered(awaitnewEvent);
} else {
@@ -101,6 +110,7 @@
}
}
+
private boolean processOrdered(boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -139,39 +149,36 @@
while (true) {
RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
- awaitNewEvents = true;
- break;
+ return true;
} else {
dispatch(e);
}
}
- return awaitNewEvents;
}
@Override
public void close() {
+ setClosed(true);
repositoryFiles.close();
}
}
- private final EventConsumer eventConsumer;
+ private final AbstractEventStream eventStream;
public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
- eventConsumer = new DirectoryConsumer(acc, p);
- eventConsumer.startTime = startTime;
+ eventStream = new DirectoryStream(acc, p);
+ eventStream.setStartTime(startTime);
}
+ @Override
public void close() {
- eventConsumer.close();
+ eventStream.close();
}
+ @Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.onFlush(action);
- }
-
- void start(long startNanos) {
- eventConsumer.start(startNanos);
+ eventStream.onFlush(action);
}
@Override
@@ -184,58 +191,62 @@
startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
}
- void startAsync(long startNanos) {
- eventConsumer.startAsync(startNanos);
- }
-
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- eventConsumer.onEvent(action);
+ eventStream.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- eventConsumer.onEvent(eventName, action);
+ eventStream.onEvent(eventName, action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.addCloseAction(action);
+ eventStream.addCloseAction(action);
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- return eventConsumer.remove(action);
+ return eventStream.remove(action);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- eventConsumer.awaitTermination(timeout);
+ eventStream.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- eventConsumer.awaitTermination(Duration.ofMillis(0));
+ eventStream.awaitTermination(Duration.ofMillis(0));
}
@Override
public void setReuse(boolean reuse) {
- eventConsumer.setReuse(reuse);
+ eventStream.setReuse(reuse);
}
@Override
public void setOrdered(boolean ordered) {
- eventConsumer.setOrdered(ordered);
+ eventStream.setOrdered(ordered);
}
@Override
public void setStartTime(Instant startTime) {
- eventConsumer.setStartTime(startTime);
+ eventStream.setStartTime(startTime);
+ }
+
+ public void start(long startNanos) {
+ eventStream.start(startNanos);
+ }
+
+ public void startAsync(long startNanos) {
+ eventStream.startAsync(startNanos);
}
}