33 import java.util.Comparator; |
33 import java.util.Comparator; |
34 import java.util.Objects; |
34 import java.util.Objects; |
35 |
35 |
36 import jdk.jfr.consumer.RecordedEvent; |
36 import jdk.jfr.consumer.RecordedEvent; |
37 import jdk.jfr.internal.JVM; |
37 import jdk.jfr.internal.JVM; |
|
38 import jdk.jfr.internal.PlatformRecording; |
38 import jdk.jfr.internal.Utils; |
39 import jdk.jfr.internal.Utils; |
39 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration; |
40 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration; |
40 |
41 |
41 /** |
42 /** |
42 * Implementation of an {@code EventStream}} that operates against a directory |
43 * Implementation of an {@code EventStream}} that operates against a directory |
43 * with chunk files. |
44 * with chunk files. |
44 * |
45 * |
45 */ |
46 */ |
46 public final class EventDirectoryStream extends AbstractEventStream { |
47 public class EventDirectoryStream extends AbstractEventStream { |
47 |
48 |
48 private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator(); |
49 private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator(); |
49 |
50 |
50 private final RepositoryFiles repositoryFiles; |
51 private final RepositoryFiles repositoryFiles; |
51 private final boolean active; |
52 private final PlatformRecording recording; |
52 private final FileAccess fileAccess; |
53 private final FileAccess fileAccess; |
53 |
54 |
54 private ChunkParser currentParser; |
55 private ChunkParser currentParser; |
55 private long currentChunkStartNanos; |
56 private long currentChunkStartNanos; |
56 private RecordedEvent[] sortedCache; |
57 private RecordedEvent[] sortedCache; |
57 private int threadExclusionLevel = 0; |
58 private int threadExclusionLevel = 0; |
58 |
59 |
59 public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { |
60 public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException { |
60 super(acc, active); |
61 super(acc, recording); |
61 this.fileAccess = Objects.requireNonNull(fileAccess); |
62 this.fileAccess = Objects.requireNonNull(fileAccess); |
62 this.active = active; |
63 this.recording = recording; |
63 this.repositoryFiles = new RepositoryFiles(fileAccess, p); |
64 this.repositoryFiles = new RepositoryFiles(fileAccess, p); |
64 } |
65 } |
65 |
66 |
66 @Override |
67 @Override |
67 public void close() { |
68 public void close() { |
102 |
103 |
103 protected void processRecursionSafe() throws IOException { |
104 protected void processRecursionSafe() throws IOException { |
104 Dispatcher disp = dispatcher(); |
105 Dispatcher disp = dispatcher(); |
105 |
106 |
106 Path path; |
107 Path path; |
107 boolean validStartTime = active || disp.startTime != null; |
108 boolean validStartTime = recording != null || disp.startTime != null; |
108 if (validStartTime) { |
109 if (validStartTime) { |
109 path = repositoryFiles.firstPath(disp.startNanos); |
110 path = repositoryFiles.firstPath(disp.startNanos); |
110 } else { |
111 } else { |
111 path = repositoryFiles.lastPath(); |
112 path = repositoryFiles.lastPath(); |
112 } |
113 } |
137 if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { |
138 if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { |
138 close(); |
139 close(); |
139 return; |
140 return; |
140 } |
141 } |
141 } |
142 } |
142 |
143 if (isLastChunk()) { |
|
144 // Recording was stopped/closed externally, and no more data to process. |
|
145 return; |
|
146 } |
|
147 |
|
148 if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) { |
|
149 // JVM process exited/crashed, or repository migrated to an unknown location |
|
150 return; |
|
151 } |
143 if (isClosed()) { |
152 if (isClosed()) { |
|
153 // Stream was closed |
144 return; |
154 return; |
145 } |
155 } |
146 long durationNanos = currentParser.getChunkDuration(); |
156 long durationNanos = currentParser.getChunkDuration(); |
147 if (durationNanos == 0) { |
157 if (durationNanos == 0) { |
148 // Avoid reading the same chunk again and again if |
158 // Avoid reading the same chunk again and again if |
158 currentParser = currentParser.newChunkParser(); |
168 currentParser = currentParser.newChunkParser(); |
159 // TODO: Optimization. No need filter when we reach new chunk |
169 // TODO: Optimization. No need filter when we reach new chunk |
160 // Could set start = 0; |
170 // Could set start = 0; |
161 } |
171 } |
162 } |
172 } |
|
173 } |
|
174 |
|
175 private boolean isLastChunk() { |
|
176 if (recording == null) { |
|
177 return false; |
|
178 } |
|
179 return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); |
163 } |
180 } |
164 |
181 |
165 private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { |
182 private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { |
166 if (sortedCache == null) { |
183 if (sortedCache == null) { |
167 sortedCache = new RecordedEvent[100_000]; |
184 sortedCache = new RecordedEvent[100_000]; |