--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Sep 02 21:08:41 2019 +0200
@@ -28,12 +28,8 @@
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.time.Duration;
-import java.time.Instant;
import java.util.Arrays;
import java.util.Objects;
-import java.util.function.Consumer;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
@@ -42,196 +38,110 @@
* Implementation of an event stream that operates against a recording file.
*
*/
-final class EventFileStream implements EventStream {
-
- private final static class FileStream extends AbstractEventStream {
- private static final int DEFAULT_ARRAY_SIZE = 100_000;
-
- private final RecordingInput input;
-
- private ChunkParser chunkParser;
- private RecordedEvent[] sortedList;
-
- public FileStream(AccessControlContext acc, Path path) throws IOException {
- super(acc, false);
- this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
-; }
-
- @Override
- public void process() throws IOException {
- final StreamConfiguration c1 = configuration;
- long start = 0;
- long end = Long.MAX_VALUE;
- if (c1.getStartTime() != null) {
- start = c1.getStartNanos();
- }
- if (c1.getEndTime() != null) {
- end = c1.getEndNanos();
- }
+final class EventFileStream extends AbstractEventStream {
+ private final RecordingInput input;
+ private ChunkParser chunkParser;
+ private RecordedEvent[] sortedList;
- chunkParser = new ChunkParser(input, c1.getReuse());
- while (!isClosed()) {
- if (chunkParser.getStartNanos() > end) {
- close();
- return;
- }
- StreamConfiguration c2 = configuration;
- boolean ordered = c2.getOrdered();
- chunkParser.setFlushOperation(flushOperation);
- chunkParser.setFirstNanos(start);
- chunkParser.setLastNanos(end);
- chunkParser.setReuse(c2.getReuse());
- chunkParser.setOrdered(ordered);
- chunkParser.resetEventCache();
- chunkParser.setParserFilter(c2.getFiler());
- chunkParser.updateEventParsers();
- clearLastDispatch();
- if (ordered) {
- processOrdered();
- } else {
- processUnordered();
- }
- if (chunkParser.isLastChunk()) {
- return;
- }
- chunkParser = chunkParser.nextChunkParser();
- }
- }
-
- private void processOrdered() throws IOException {
- if (sortedList == null) {
- sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
- }
- RecordedEvent event;
- int index = 0;
- while (true) {
- event = chunkParser.readEvent();
- if (event == null) {
- Arrays.sort(sortedList, 0, index, END_TIME);
- for (int i = 0; i < index; i++) {
- dispatch(sortedList[i]);
- }
- return;
- }
- if (index == sortedList.length) {
- RecordedEvent[] tmp = sortedList;
- sortedList = new RecordedEvent[2 * tmp.length];
- System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
- }
- sortedList[index++] = event;
- }
- }
-
- private void processUnordered() throws IOException {
- RecordedEvent event;
- while (!isClosed()) {
- event = chunkParser.readEvent();
- if (event == null) {
- return;
- }
- dispatch(event);
- }
- }
-
- @Override
- public void close() {
- setClosed(true);;
- runCloseActions();
- try {
- input.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
-
- private final FileStream eventStream;
-
- public EventFileStream(Path path, Instant from, Instant to) throws IOException {
+ public EventFileStream(AccessControlContext acc, Path path) throws IOException {
+ super(acc, false);
Objects.requireNonNull(path);
- eventStream = new FileStream(AccessController.getContext(), path);
+ this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
}
@Override
- public void onEvent(Consumer<RecordedEvent> action) {
- Objects.requireNonNull(action);
- eventStream.onEvent(action);
+ public void start() {
+ start(0);
}
@Override
- public void onEvent(String eventName, Consumer<RecordedEvent> action) {
- Objects.requireNonNull(eventName);
- Objects.requireNonNull(action);
- eventStream.onEvent(eventName, action);
- }
-
- @Override
- public void onFlush(Runnable action) {
- Objects.requireNonNull(action);
- eventStream.onFlush(action);
- }
-
- @Override
- public void onClose(Runnable action) {
- Objects.requireNonNull(action);
- eventStream.addCloseAction(action);
+ public void startAsync() {
+ startAsync(0);
}
@Override
public void close() {
- eventStream.close();
- }
-
- @Override
- public boolean remove(Object action) {
- Objects.requireNonNull(action);
- return eventStream.remove(action);
- }
-
- @Override
- public void start() {
- eventStream.start(0);
- }
-
- @Override
- public void setReuse(boolean reuse) {
- eventStream.setReuse(reuse);
- }
-
- @Override
- public void startAsync() {
- eventStream.startAsync(0);
+ setClosed(true);
+ runCloseActions();
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
@Override
- public void awaitTermination(Duration timeout) {
- Objects.requireNonNull(timeout);
- eventStream.awaitTermination(timeout);
- }
+ public void process() throws IOException {
+ StreamConfiguration c = configuration;
+ long start = 0;
+ long end = Long.MAX_VALUE;
+ if (c.getStartTime() != null) {
+ start = c.getStartNanos();
+ }
+ if (c.getEndTime() != null) {
+ end = c.getEndNanos();
+ }
- @Override
- public void awaitTermination() {
- eventStream.awaitTermination();
- }
-
- @Override
- public void setOrdered(boolean ordered) {
- eventStream.setOrdered(ordered);
+ chunkParser = new ChunkParser(input, c.getReuse());
+ while (!isClosed()) {
+ if (chunkParser.getStartNanos() > end) {
+ close();
+ return;
+ }
+ c = configuration;
+ boolean ordered = c.getOrdered();
+ chunkParser.setFlushOperation(getFlushOperation());
+ chunkParser.setFilterStart(start);
+ chunkParser.setFilterEnd(end);
+ chunkParser.setReuse(c.getReuse());
+ chunkParser.setOrdered(ordered);
+ chunkParser.resetEventCache();
+ chunkParser.setParserFilter(c.getFiler());
+ chunkParser.updateEventParsers();
+ clearLastDispatch();
+ if (ordered) {
+ processOrdered();
+ } else {
+ processUnordered();
+ }
+ if (chunkParser.isLastChunk()) {
+ return;
+ }
+ chunkParser = chunkParser.nextChunkParser();
+ }
}
- @Override
- public void setStartTime(Instant startTime) {
- eventStream.setStartTime(startTime);
+ private void processOrdered() throws IOException {
+ if (sortedList == null) {
+ sortedList = new RecordedEvent[10_000];
+ }
+ RecordedEvent event;
+ int index = 0;
+ while (true) {
+ event = chunkParser.readEvent();
+ if (event == null) {
+ Arrays.sort(sortedList, 0, index, END_TIME);
+ for (int i = 0; i < index; i++) {
+ dispatch(sortedList[i]);
+ }
+ return;
+ }
+ if (index == sortedList.length) {
+ RecordedEvent[] tmp = sortedList;
+ sortedList = new RecordedEvent[2 * tmp.length];
+ System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
+ }
+ sortedList[index++] = event;
+ }
}
- @Override
- public void setEndTime(Instant endTime) {
- eventStream.setEndTime(endTime);
- }
-
- @Override
- public void onError(Consumer<Throwable> action) {
- // TODO Auto-generated method stub
-
+ private void processUnordered() throws IOException {
+ while (!isClosed()) {
+ RecordedEvent event = chunkParser.readEvent();
+ if (event == null) {
+ return;
+ }
+ dispatch(event);
+ }
}
}