26 package jdk.jfr.consumer; |
26 package jdk.jfr.consumer; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.file.Path; |
29 import java.nio.file.Path; |
30 import java.security.AccessControlContext; |
30 import java.security.AccessControlContext; |
31 import java.time.Duration; |
|
32 import java.time.Instant; |
31 import java.time.Instant; |
33 import java.util.Arrays; |
32 import java.util.Arrays; |
34 import java.util.Comparator; |
|
35 import java.util.Objects; |
33 import java.util.Objects; |
36 import java.util.function.Consumer; |
|
37 |
34 |
38 import jdk.jfr.internal.Utils; |
35 import jdk.jfr.internal.Utils; |
39 import jdk.jfr.internal.consumer.FileAccess; |
36 import jdk.jfr.internal.consumer.FileAccess; |
40 import jdk.jfr.internal.consumer.RecordingInput; |
37 import jdk.jfr.internal.consumer.RecordingInput; |
41 import jdk.jfr.internal.consumer.RepositoryFiles; |
38 import jdk.jfr.internal.consumer.RepositoryFiles; |
43 /** |
40 /** |
44 * Implementation of an {@code EventStream}} that operates against a directory |
41 * Implementation of an {@code EventStream}} that operates against a directory |
45 * with chunk files. |
42 * with chunk files. |
46 * |
43 * |
47 */ |
44 */ |
48 class EventDirectoryStream implements EventStream { |
45 final class EventDirectoryStream extends AbstractEventStream { |
|
46 private final RepositoryFiles repositoryFiles; |
|
47 private final boolean active; |
|
48 private final FileAccess fileAccess; |
|
49 private ChunkParser chunkParser; |
|
50 private long chunkStartNanos; |
|
51 private RecordedEvent[] sortedList; |
49 |
52 |
50 static final class DirectoryStream extends AbstractEventStream { |
53 public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { |
51 |
54 super(acc, active); |
52 private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); |
55 this.fileAccess = Objects.requireNonNull(fileAccess); |
53 private static final int DEFAULT_ARRAY_SIZE = 10_000; |
56 this.active = active; |
54 |
57 this.repositoryFiles = new RepositoryFiles(fileAccess, p); |
55 private final RepositoryFiles repositoryFiles; |
|
56 private final boolean active; |
|
57 private final FileAccess fileAccess; |
|
58 private ChunkParser chunkParser; |
|
59 private RecordedEvent[] sortedList; |
|
60 protected long chunkStartNanos; |
|
61 |
|
62 public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { |
|
63 super(acc, active); |
|
64 this.fileAccess = fileAccess; |
|
65 this.active = active; |
|
66 repositoryFiles = new RepositoryFiles(fileAccess, p); |
|
67 } |
|
68 |
|
69 @Override |
|
70 public void process() throws Exception { |
|
71 final StreamConfiguration c1 = configuration; |
|
72 Path path; |
|
73 boolean validStartTime = active || c1.getStartTime() != null; |
|
74 if (validStartTime) { |
|
75 path = repositoryFiles.firstPath(c1.getStartNanos()); |
|
76 } else { |
|
77 path = repositoryFiles.lastPath(); |
|
78 } |
|
79 if (path == null) { // closed |
|
80 return; |
|
81 } |
|
82 chunkStartNanos = repositoryFiles.getTimestamp(path); |
|
83 try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { |
|
84 chunkParser = new ChunkParser(input, c1.getReuse()); |
|
85 long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); |
|
86 long start = validStartTime ? c1.getStartNanos() : segmentStart; |
|
87 long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE; |
|
88 while (!isClosed()) { |
|
89 boolean awaitnewEvent = false; |
|
90 while (!isClosed() && !chunkParser.isChunkFinished()) { |
|
91 final StreamConfiguration c2 = configuration; |
|
92 boolean ordered = c2.getOrdered(); |
|
93 chunkParser.setFlushOperation(flushOperation); |
|
94 chunkParser.setReuse(c2.getReuse()); |
|
95 chunkParser.setOrdered(ordered); |
|
96 chunkParser.setFirstNanos(start); |
|
97 chunkParser.setLastNanos(end); |
|
98 chunkParser.resetEventCache(); |
|
99 chunkParser.setParserFilter(c2.getFilter()); |
|
100 chunkParser.updateEventParsers(); |
|
101 clearLastDispatch(); |
|
102 if (ordered) { |
|
103 awaitnewEvent = processOrdered(awaitnewEvent); |
|
104 } else { |
|
105 awaitnewEvent = processUnordered(awaitnewEvent); |
|
106 } |
|
107 if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > end) { |
|
108 close(); |
|
109 return; |
|
110 } |
|
111 } |
|
112 |
|
113 |
|
114 if (isClosed()) { |
|
115 return; |
|
116 } |
|
117 long durationNanos = chunkParser.getChunkDuration(); |
|
118 path = repositoryFiles.nextPath(chunkStartNanos + durationNanos); |
|
119 if (path == null) { |
|
120 return; // stream closed |
|
121 } |
|
122 chunkStartNanos = repositoryFiles.getTimestamp(path); |
|
123 input.setFile(path); |
|
124 chunkParser = chunkParser.newChunkParser(); |
|
125 // No need filter when we reach new chunk |
|
126 // start = 0; |
|
127 } |
|
128 } |
|
129 } |
|
130 |
|
131 private boolean processOrdered(boolean awaitNewEvents) throws IOException { |
|
132 if (sortedList == null) { |
|
133 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; |
|
134 } |
|
135 int index = 0; |
|
136 while (true) { |
|
137 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
|
138 if (e == null) { |
|
139 // wait for new event with next call to |
|
140 // readStreamingEvent() |
|
141 awaitNewEvents = true; |
|
142 break; |
|
143 } |
|
144 awaitNewEvents = false; |
|
145 if (index == sortedList.length) { |
|
146 sortedList = Arrays.copyOf(sortedList, sortedList.length * 2); |
|
147 } |
|
148 sortedList[index++] = e; |
|
149 } |
|
150 |
|
151 // no events found |
|
152 if (index == 0 && chunkParser.isChunkFinished()) { |
|
153 return awaitNewEvents; |
|
154 } |
|
155 // at least 2 events, sort them |
|
156 if (index > 1) { |
|
157 Arrays.sort(sortedList, 0, index, END_TIME); |
|
158 } |
|
159 for (int i = 0; i < index; i++) { |
|
160 dispatch(sortedList[i]); |
|
161 } |
|
162 return awaitNewEvents; |
|
163 } |
|
164 |
|
165 private boolean processUnordered(boolean awaitNewEvents) throws IOException { |
|
166 while (true) { |
|
167 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
|
168 if (e == null) { |
|
169 return true; |
|
170 } else { |
|
171 dispatch(e); |
|
172 } |
|
173 } |
|
174 } |
|
175 |
|
176 @Override |
|
177 public void close() { |
|
178 setClosed(true); |
|
179 repositoryFiles.close(); |
|
180 } |
|
181 } |
|
182 |
|
183 private final AbstractEventStream eventStream; |
|
184 |
|
185 public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException { |
|
186 eventStream = new DirectoryStream(acc, p, access, active); |
|
187 } |
58 } |
188 |
59 |
189 @Override |
60 @Override |
190 public void close() { |
61 public void close() { |
191 eventStream.close(); |
62 setClosed(true); |
192 } |
63 runCloseActions(); |
193 |
64 repositoryFiles.close(); |
194 @Override |
|
195 public void onFlush(Runnable action) { |
|
196 Objects.requireNonNull(action); |
|
197 eventStream.onFlush(action); |
|
198 } |
65 } |
199 |
66 |
200 @Override |
67 @Override |
201 public void start() { |
68 public void start() { |
202 start(Utils.timeToNanos(Instant.now())); |
69 start(Utils.timeToNanos(Instant.now())); |
206 public void startAsync() { |
73 public void startAsync() { |
207 startAsync(Utils.timeToNanos(Instant.now())); |
74 startAsync(Utils.timeToNanos(Instant.now())); |
208 } |
75 } |
209 |
76 |
210 @Override |
77 @Override |
211 public void onEvent(Consumer<RecordedEvent> action) { |
78 public void process() throws Exception { |
212 Objects.requireNonNull(action); |
79 StreamConfiguration c = configuration; |
213 eventStream.onEvent(action); |
80 Path path; |
|
81 boolean validStartTime = active || c.getStartTime() != null; |
|
82 if (validStartTime) { |
|
83 path = repositoryFiles.firstPath(c.getStartNanos()); |
|
84 } else { |
|
85 path = repositoryFiles.lastPath(); |
|
86 } |
|
87 if (path == null) { // closed |
|
88 return; |
|
89 } |
|
90 chunkStartNanos = repositoryFiles.getTimestamp(path); |
|
91 try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { |
|
92 chunkParser = new ChunkParser(input, c.getReuse()); |
|
93 long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); |
|
94 long filtertStart = validStartTime ? c.getStartNanos() : segmentStart; |
|
95 long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE; |
|
96 while (!isClosed()) { |
|
97 boolean awaitnewEvent = false; |
|
98 while (!isClosed() && !chunkParser.isChunkFinished()) { |
|
99 c = configuration; |
|
100 boolean ordered = c.getOrdered(); |
|
101 chunkParser.setFlushOperation(getFlushOperation()); |
|
102 chunkParser.setReuse(c.getReuse()); |
|
103 chunkParser.setOrdered(ordered); |
|
104 chunkParser.setFilterStart(filtertStart); |
|
105 chunkParser.setFilterEnd(filterEnd); |
|
106 chunkParser.resetEventCache(); |
|
107 chunkParser.setParserFilter(c.getFilter()); |
|
108 chunkParser.updateEventParsers(); |
|
109 clearLastDispatch(); |
|
110 if (ordered) { |
|
111 awaitnewEvent = processOrdered(awaitnewEvent); |
|
112 } else { |
|
113 awaitnewEvent = processUnordered(awaitnewEvent); |
|
114 } |
|
115 if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) { |
|
116 close(); |
|
117 return; |
|
118 } |
|
119 } |
|
120 |
|
121 if (isClosed()) { |
|
122 return; |
|
123 } |
|
124 long durationNanos = chunkParser.getChunkDuration(); |
|
125 path = repositoryFiles.nextPath(chunkStartNanos + durationNanos); |
|
126 if (path == null) { |
|
127 return; // stream closed |
|
128 } |
|
129 chunkStartNanos = repositoryFiles.getTimestamp(path); |
|
130 input.setFile(path); |
|
131 chunkParser = chunkParser.newChunkParser(); |
|
132 // TODO: Optimization. No need filter when we reach new chunk |
|
133 // Could set start = 0; |
|
134 } |
|
135 } |
214 } |
136 } |
215 |
137 |
216 @Override |
138 private boolean processOrdered(boolean awaitNewEvents) throws IOException { |
217 public void onEvent(String eventName, Consumer<RecordedEvent> action) { |
139 if (sortedList == null) { |
218 Objects.requireNonNull(eventName); |
140 sortedList = new RecordedEvent[100_000]; |
219 Objects.requireNonNull(action); |
141 } |
220 eventStream.onEvent(eventName, action); |
142 int index = 0; |
|
143 while (true) { |
|
144 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
|
145 if (e == null) { |
|
146 // wait for new event with next call to |
|
147 // readStreamingEvent() |
|
148 awaitNewEvents = true; |
|
149 break; |
|
150 } |
|
151 awaitNewEvents = false; |
|
152 if (index == sortedList.length) { |
|
153 sortedList = Arrays.copyOf(sortedList, sortedList.length * 2); |
|
154 } |
|
155 sortedList[index++] = e; |
|
156 } |
|
157 |
|
158 // no events found |
|
159 if (index == 0 && chunkParser.isChunkFinished()) { |
|
160 return awaitNewEvents; |
|
161 } |
|
162 // at least 2 events, sort them |
|
163 if (index > 1) { |
|
164 Arrays.sort(sortedList, 0, index, END_TIME); |
|
165 } |
|
166 for (int i = 0; i < index; i++) { |
|
167 dispatch(sortedList[i]); |
|
168 } |
|
169 return awaitNewEvents; |
221 } |
170 } |
222 |
171 |
223 @Override |
172 private boolean processUnordered(boolean awaitNewEvents) throws IOException { |
224 public void onClose(Runnable action) { |
173 while (true) { |
225 Objects.requireNonNull(action); |
174 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
226 eventStream.addCloseAction(action); |
175 if (e == null) { |
|
176 return true; |
|
177 } else { |
|
178 dispatch(e); |
|
179 } |
|
180 } |
227 } |
181 } |
228 |
|
229 @Override |
|
230 public boolean remove(Object action) { |
|
231 Objects.requireNonNull(action); |
|
232 return eventStream.remove(action); |
|
233 } |
|
234 |
|
235 @Override |
|
236 public void awaitTermination(Duration timeout) { |
|
237 Objects.requireNonNull(timeout); |
|
238 eventStream.awaitTermination(timeout); |
|
239 } |
|
240 |
|
241 @Override |
|
242 public void awaitTermination() { |
|
243 eventStream.awaitTermination(Duration.ofMillis(0)); |
|
244 } |
|
245 |
|
246 @Override |
|
247 public void setReuse(boolean reuse) { |
|
248 eventStream.setReuse(reuse); |
|
249 } |
|
250 |
|
251 @Override |
|
252 public void setOrdered(boolean ordered) { |
|
253 eventStream.setOrdered(ordered); |
|
254 } |
|
255 |
|
256 @Override |
|
257 public void setStartTime(Instant startTime) { |
|
258 eventStream.setStartTime(startTime); |
|
259 } |
|
260 |
|
261 @Override |
|
262 public void setEndTime(Instant endTime) { |
|
263 eventStream.setEndTime(endTime); |
|
264 } |
|
265 |
|
266 |
|
267 public void start(long startNanos) { |
|
268 eventStream.start(startNanos); |
|
269 } |
|
270 |
|
271 public void startAsync(long startNanos) { |
|
272 eventStream.startAsync(startNanos); |
|
273 } |
|
274 |
|
275 @Override |
|
276 public void onError(Consumer<Throwable> action) { |
|
277 // TODO Auto-generated method stub |
|
278 |
|
279 } |
|
280 |
|
281 |
|
282 } |
182 } |