--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Sep 02 21:08:41 2019 +0200
@@ -28,12 +28,9 @@
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
-import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Objects;
-import java.util.function.Consumer;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.FileAccess;
@@ -45,156 +42,26 @@
* with chunk files.
*
*/
-class EventDirectoryStream implements EventStream {
-
- static final class DirectoryStream extends AbstractEventStream {
-
- private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
- private static final int DEFAULT_ARRAY_SIZE = 10_000;
-
- private final RepositoryFiles repositoryFiles;
- private final boolean active;
- private final FileAccess fileAccess;
- private ChunkParser chunkParser;
- private RecordedEvent[] sortedList;
- protected long chunkStartNanos;
-
- public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
- super(acc, active);
- this.fileAccess = fileAccess;
- this.active = active;
- repositoryFiles = new RepositoryFiles(fileAccess, p);
- }
-
- @Override
- public void process() throws Exception {
- final StreamConfiguration c1 = configuration;
- Path path;
- boolean validStartTime = active || c1.getStartTime() != null;
- if (validStartTime) {
- path = repositoryFiles.firstPath(c1.getStartNanos());
- } else {
- path = repositoryFiles.lastPath();
- }
- if (path == null) { // closed
- return;
- }
- chunkStartNanos = repositoryFiles.getTimestamp(path);
- try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
- chunkParser = new ChunkParser(input, c1.getReuse());
- long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
- long start = validStartTime ? c1.getStartNanos() : segmentStart;
- long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE;
- while (!isClosed()) {
- boolean awaitnewEvent = false;
- while (!isClosed() && !chunkParser.isChunkFinished()) {
- final StreamConfiguration c2 = configuration;
- boolean ordered = c2.getOrdered();
- chunkParser.setFlushOperation(flushOperation);
- chunkParser.setReuse(c2.getReuse());
- chunkParser.setOrdered(ordered);
- chunkParser.setFirstNanos(start);
- chunkParser.setLastNanos(end);
- chunkParser.resetEventCache();
- chunkParser.setParserFilter(c2.getFilter());
- chunkParser.updateEventParsers();
- clearLastDispatch();
- if (ordered) {
- awaitnewEvent = processOrdered(awaitnewEvent);
- } else {
- awaitnewEvent = processUnordered(awaitnewEvent);
- }
- if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > end) {
- close();
- return;
- }
- }
-
+final class EventDirectoryStream extends AbstractEventStream {
+ private final RepositoryFiles repositoryFiles;
+ private final boolean active;
+ private final FileAccess fileAccess;
+ private ChunkParser chunkParser;
+ private long chunkStartNanos;
+ private RecordedEvent[] sortedList;
- if (isClosed()) {
- return;
- }
- long durationNanos = chunkParser.getChunkDuration();
- path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
- if (path == null) {
- return; // stream closed
- }
- chunkStartNanos = repositoryFiles.getTimestamp(path);
- input.setFile(path);
- chunkParser = chunkParser.newChunkParser();
- // No need filter when we reach new chunk
- // start = 0;
- }
- }
- }
-
- private boolean processOrdered(boolean awaitNewEvents) throws IOException {
- if (sortedList == null) {
- sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
- }
- int index = 0;
- while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
- if (e == null) {
- // wait for new event with next call to
- // readStreamingEvent()
- awaitNewEvents = true;
- break;
- }
- awaitNewEvents = false;
- if (index == sortedList.length) {
- sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
- }
- sortedList[index++] = e;
- }
-
- // no events found
- if (index == 0 && chunkParser.isChunkFinished()) {
- return awaitNewEvents;
- }
- // at least 2 events, sort them
- if (index > 1) {
- Arrays.sort(sortedList, 0, index, END_TIME);
- }
- for (int i = 0; i < index; i++) {
- dispatch(sortedList[i]);
- }
- return awaitNewEvents;
- }
-
- private boolean processUnordered(boolean awaitNewEvents) throws IOException {
- while (true) {
- RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
- if (e == null) {
- return true;
- } else {
- dispatch(e);
- }
- }
- }
-
- @Override
- public void close() {
- setClosed(true);
- repositoryFiles.close();
- }
- }
-
- private final AbstractEventStream eventStream;
-
- public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException {
- eventStream = new DirectoryStream(acc, p, access, active);
+ public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+ super(acc, active);
+ this.fileAccess = Objects.requireNonNull(fileAccess);
+ this.active = active;
+ this.repositoryFiles = new RepositoryFiles(fileAccess, p);
}
@Override
public void close() {
- eventStream.close();
- }
-
- @Override
- public void onFlush(Runnable action) {
- Objects.requireNonNull(action);
- eventStream.onFlush(action);
+ setClosed(true);
+ runCloseActions();
+ repositoryFiles.close();
}
@Override
@@ -208,75 +75,108 @@
}
@Override
- public void onEvent(Consumer<RecordedEvent> action) {
- Objects.requireNonNull(action);
- eventStream.onEvent(action);
- }
-
- @Override
- public void onEvent(String eventName, Consumer<RecordedEvent> action) {
- Objects.requireNonNull(eventName);
- Objects.requireNonNull(action);
- eventStream.onEvent(eventName, action);
- }
+ public void process() throws Exception {
+ StreamConfiguration c = configuration;
+ Path path;
+ boolean validStartTime = active || c.getStartTime() != null;
+ if (validStartTime) {
+ path = repositoryFiles.firstPath(c.getStartNanos());
+ } else {
+ path = repositoryFiles.lastPath();
+ }
+ if (path == null) { // closed
+ return;
+ }
+ chunkStartNanos = repositoryFiles.getTimestamp(path);
+ try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
+ chunkParser = new ChunkParser(input, c.getReuse());
+ long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
+ long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
+ long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+ while (!isClosed()) {
+ boolean awaitnewEvent = false;
+ while (!isClosed() && !chunkParser.isChunkFinished()) {
+ c = configuration;
+ boolean ordered = c.getOrdered();
+ chunkParser.setFlushOperation(getFlushOperation());
+ chunkParser.setReuse(c.getReuse());
+ chunkParser.setOrdered(ordered);
+ chunkParser.setFilterStart(filtertStart);
+ chunkParser.setFilterEnd(filterEnd);
+ chunkParser.resetEventCache();
+ chunkParser.setParserFilter(c.getFilter());
+ chunkParser.updateEventParsers();
+ clearLastDispatch();
+ if (ordered) {
+ awaitnewEvent = processOrdered(awaitnewEvent);
+ } else {
+ awaitnewEvent = processUnordered(awaitnewEvent);
+ }
+ if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
+ close();
+ return;
+ }
+ }
- @Override
- public void onClose(Runnable action) {
- Objects.requireNonNull(action);
- eventStream.addCloseAction(action);
- }
-
- @Override
- public boolean remove(Object action) {
- Objects.requireNonNull(action);
- return eventStream.remove(action);
- }
-
- @Override
- public void awaitTermination(Duration timeout) {
- Objects.requireNonNull(timeout);
- eventStream.awaitTermination(timeout);
- }
-
- @Override
- public void awaitTermination() {
- eventStream.awaitTermination(Duration.ofMillis(0));
+ if (isClosed()) {
+ return;
+ }
+ long durationNanos = chunkParser.getChunkDuration();
+ path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
+ if (path == null) {
+ return; // stream closed
+ }
+ chunkStartNanos = repositoryFiles.getTimestamp(path);
+ input.setFile(path);
+ chunkParser = chunkParser.newChunkParser();
+ // TODO: Optimization. No need filter when we reach new chunk
+ // Could set start = 0;
+ }
+ }
}
- @Override
- public void setReuse(boolean reuse) {
- eventStream.setReuse(reuse);
- }
+ private boolean processOrdered(boolean awaitNewEvents) throws IOException {
+ if (sortedList == null) {
+ sortedList = new RecordedEvent[100_000];
+ }
+ int index = 0;
+ while (true) {
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ // wait for new event with next call to
+ // readStreamingEvent()
+ awaitNewEvents = true;
+ break;
+ }
+ awaitNewEvents = false;
+ if (index == sortedList.length) {
+ sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+ }
+ sortedList[index++] = e;
+ }
- @Override
- public void setOrdered(boolean ordered) {
- eventStream.setOrdered(ordered);
- }
-
- @Override
- public void setStartTime(Instant startTime) {
- eventStream.setStartTime(startTime);
+ // no events found
+ if (index == 0 && chunkParser.isChunkFinished()) {
+ return awaitNewEvents;
+ }
+ // at least 2 events, sort them
+ if (index > 1) {
+ Arrays.sort(sortedList, 0, index, END_TIME);
+ }
+ for (int i = 0; i < index; i++) {
+ dispatch(sortedList[i]);
+ }
+ return awaitNewEvents;
}
- @Override
- public void setEndTime(Instant endTime) {
- eventStream.setEndTime(endTime);
- }
-
-
- public void start(long startNanos) {
- eventStream.start(startNanos);
+ private boolean processUnordered(boolean awaitNewEvents) throws IOException {
+ while (true) {
+ RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+ if (e == null) {
+ return true;
+ } else {
+ dispatch(e);
+ }
+ }
}
-
- public void startAsync(long startNanos) {
- eventStream.startAsync(startNanos);
- }
-
- @Override
- public void onError(Consumer<Throwable> action) {
- // TODO Auto-generated method stub
-
- }
-
-
}