--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -43,39 +43,41 @@
*/
final class EventFileStream implements EventStream {
- private final static class FileConsumer extends EventConsumer {
+ private final static class FileStream extends AbstractEventStream {
private static final int DEFAULT_ARRAY_SIZE = 100_000;
+
private final RecordingInput input;
+
private ChunkParser chunkParser;
- private boolean reuse = true;
private RecordedEvent[] sortedList;
- private boolean ordered;
- public FileConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+ public FileStream(AccessControlContext acc, Path path) throws IOException {
super(acc);
- this.input = input;
- }
+ this.input = new RecordingInput(path.toFile());
+; }
@Override
public void process() throws IOException {
- chunkParser = new ChunkParser(input, reuse);
+ StreamConfiguration c1 = configuration;
+ chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
- boolean reuse = this.reuse;
- boolean ordered = this.ordered;
- chunkParser.setReuse(reuse);
+ StreamConfiguration c2 = configuration;
+ boolean ordered = c2.getOrdered();
+ chunkParser.setReuse(c2.getReuse());
chunkParser.setOrdered(ordered);
chunkParser.resetEventCache();
- chunkParser.setParserFilter(eventFilter);
+ chunkParser.setParserFilter(c2.getFiler());
chunkParser.updateEventParsers();
+ clearLastDispatch();
if (ordered) {
processOrdered();
} else {
processUnordered();
}
+ runFlushActions();
if (chunkParser.isLastChunk()) {
return;
}
- runFlushActions();
chunkParser = chunkParser.nextChunkParser();
}
}
@@ -115,106 +117,94 @@
}
}
- public void setReuse(boolean reuse) {
- this.reuse = reuse;
- }
-
- public void setOrdered(boolean ordered) {
- this.ordered = ordered;
- }
-
@Override
public void close() {
-
+ setClosed(true);;
+ runCloseActions();
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
-
-
}
- private final RecordingInput input;
- private final FileConsumer eventConsumer;
+ private final FileStream eventStream;
public EventFileStream(Path path, Instant from, Instant to) throws IOException {
Objects.requireNonNull(path);
- input = new RecordingInput(path.toFile());
- eventConsumer = new FileConsumer(AccessController.getContext(), input);
+ eventStream = new FileStream(AccessController.getContext(), path);
}
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- eventConsumer.onEvent(action);
+ eventStream.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- eventConsumer.onEvent(eventName, action);
+ eventStream.onEvent(eventName, action);
}
@Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.onFlush(action);
+ eventStream.onFlush(action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.addCloseAction(action);
+ eventStream.addCloseAction(action);
}
@Override
public void close() {
- eventConsumer.setClosed(true);
- eventConsumer.runCloseActions();
- try {
- input.close();
- } catch (IOException e) {
- // ignore
- }
+ eventStream.close();
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- return eventConsumer.remove(action);
+ return eventStream.remove(action);
}
@Override
public void start() {
- eventConsumer.start(0);
+ eventStream.start(0);
}
@Override
public void setReuse(boolean reuse) {
- eventConsumer.setReuse(reuse);
+ eventStream.setReuse(reuse);
}
@Override
public void startAsync() {
- eventConsumer.startAsync(0);
+ eventStream.startAsync(0);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- eventConsumer.awaitTermination(timeout);
+ eventStream.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- eventConsumer.awaitTermination();
+ eventStream.awaitTermination();
}
@Override
public void setOrdered(boolean ordered) {
- eventConsumer.setOrdered(ordered);
+ eventStream.setOrdered(ordered);
}
@Override
public void setStartTime(Instant startTime) {
- eventConsumer.setStartTime(startTime);
+ eventStream.setStartTime(startTime);
}
}