equal
deleted
inserted
replaced
45 */ |
45 */ |
46 final class EventDirectoryStream implements EventStream { |
46 final class EventDirectoryStream implements EventStream { |
47 |
47 |
48 static final class DirectoryConsumer extends EventConsumer { |
48 static final class DirectoryConsumer extends EventConsumer { |
49 |
49 |
50 private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); |
50 private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); |
51 private static final int DEFAULT_ARRAY_SIZE = 10_000; |
51 private static final int DEFAULT_ARRAY_SIZE = 10_000; |
52 private final RepositoryFiles repositoryFiles; |
52 private final RepositoryFiles repositoryFiles; |
53 private ChunkParser chunkParser; |
53 private ChunkParser chunkParser; |
54 private RecordedEvent[] sortedList; |
54 private RecordedEvent[] sortedList; |
55 protected long chunkStartNanos; |
55 protected long chunkStartNanos; |
76 while (!isClosed()) { |
76 while (!isClosed()) { |
77 boolean awaitnewEvent = false; |
77 boolean awaitnewEvent = false; |
78 while (!isClosed() && !chunkParser.isChunkFinished()) { |
78 while (!isClosed() && !chunkParser.isChunkFinished()) { |
79 chunkParser.setReuse(this.reuse); |
79 chunkParser.setReuse(this.reuse); |
80 chunkParser.setOrdered(this.ordered); |
80 chunkParser.setOrdered(this.ordered); |
|
81 chunkParser.setFirstNanos(startNanos); |
81 chunkParser.resetEventCache(); |
82 chunkParser.resetEventCache(); |
82 chunkParser.setParserFilter(this.eventFilter); |
83 chunkParser.setParserFilter(this.eventFilter); |
83 chunkParser.updateEventParsers(); |
84 chunkParser.updateEventParsers(); |
84 if (ordered) { |
85 if (ordered) { |
85 awaitnewEvent = processOrdered(awaitnewEvent); |
86 awaitnewEvent = processOrdered(awaitnewEvent); |