33 import java.util.Arrays; |
33 import java.util.Arrays; |
34 import java.util.Comparator; |
34 import java.util.Comparator; |
35 import java.util.Objects; |
35 import java.util.Objects; |
36 import java.util.function.Consumer; |
36 import java.util.function.Consumer; |
37 |
37 |
38 import jdk.jfr.internal.consumer.EventConsumer; |
|
39 import jdk.jfr.internal.consumer.RecordingInput; |
38 import jdk.jfr.internal.consumer.RecordingInput; |
40 import jdk.jfr.internal.consumer.RepositoryFiles; |
39 import jdk.jfr.internal.consumer.RepositoryFiles; |
41 |
40 |
42 /** |
41 /** |
43 * Implementation of an {@code EventStream}} that operates against a directory |
42 * Implementation of an {@code EventStream}} that operates against a directory |
44 * with chunk files. |
43 * with chunk files. |
45 * |
44 * |
46 */ |
45 */ |
47 final class EventDirectoryStream implements EventStream { |
46 final class EventDirectoryStream implements EventStream { |
48 |
47 |
49 static final class ParserConsumer extends EventConsumer { |
48 static final class DirectoryConsumer extends EventConsumer { |
50 |
49 |
51 private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); |
50 private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); |
52 private static final int DEFAULT_ARRAY_SIZE = 10_000; |
51 private static final int DEFAULT_ARRAY_SIZE = 10_000; |
53 private final RepositoryFiles repositoryFiles; |
52 private final RepositoryFiles repositoryFiles; |
54 private ChunkParser chunkParser; |
53 private ChunkParser chunkParser; |
55 private RecordedEvent[] sortedList; |
54 private RecordedEvent[] sortedList; |
56 |
55 protected long chunkStartNanos; |
57 public ParserConsumer(AccessControlContext acc, Path p) throws IOException { |
56 |
|
57 public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException { |
58 super(acc); |
58 super(acc); |
59 repositoryFiles = new RepositoryFiles(p); |
59 repositoryFiles = new RepositoryFiles(p); |
60 } |
60 } |
61 |
61 |
62 @Override |
62 @Override |
63 public void process() throws IOException { |
63 public void process() throws IOException { |
64 Path path = repositoryFiles.nextPath(startNanos); |
64 chunkStartNanos = startNanos; |
65 startNanos = repositoryFiles.getTimestamp(path) + 1; |
65 Path path; |
|
66 if (startTime == EventConsumer.NEXT_EVENT) { |
|
67 // TODO: Need to skip forward to the next event |
|
68 // For now, use the last chunk. |
|
69 path = repositoryFiles.lastPath(); |
|
70 } else { |
|
71 path = repositoryFiles.nextPath(chunkStartNanos); |
|
72 } |
|
73 chunkStartNanos = repositoryFiles.getTimestamp(path) + 1; |
66 try (RecordingInput input = new RecordingInput(path.toFile())) { |
74 try (RecordingInput input = new RecordingInput(path.toFile())) { |
67 chunkParser = new ChunkParser(input, this.reuse); |
75 chunkParser = new ChunkParser(input, this.reuse); |
68 while (!isClosed()) { |
76 while (!isClosed()) { |
69 boolean awaitnewEvent = false; |
77 boolean awaitnewEvent = false; |
70 while (!isClosed() && !chunkParser.isChunkFinished()) { |
78 while (!isClosed() && !chunkParser.isChunkFinished()) { |
145 } |
153 } |
146 } |
154 } |
147 |
155 |
148 private final EventConsumer eventConsumer; |
156 private final EventConsumer eventConsumer; |
149 |
157 |
150 public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException { |
158 public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException { |
151 eventConsumer = new ParserConsumer(acc, p); |
159 eventConsumer = new DirectoryConsumer(acc, p); |
|
160 eventConsumer.startTime = startTime; |
152 } |
161 } |
153 |
162 |
154 public void close() { |
163 public void close() { |
155 eventConsumer.close(); |
164 eventConsumer.close(); |
156 } |
165 } |