30 import java.security.AccessControlContext; |
30 import java.security.AccessControlContext; |
31 import java.time.Instant; |
31 import java.time.Instant; |
32 import java.util.Arrays; |
32 import java.util.Arrays; |
33 import java.util.Objects; |
33 import java.util.Objects; |
34 |
34 |
|
35 import jdk.jfr.consumer.ChunkParser.ParserConfiguration; |
35 import jdk.jfr.internal.Utils; |
36 import jdk.jfr.internal.Utils; |
36 import jdk.jfr.internal.consumer.FileAccess; |
37 import jdk.jfr.internal.consumer.FileAccess; |
37 import jdk.jfr.internal.consumer.RecordingInput; |
38 import jdk.jfr.internal.consumer.RecordingInput; |
38 import jdk.jfr.internal.consumer.RepositoryFiles; |
39 import jdk.jfr.internal.consumer.RepositoryFiles; |
39 |
40 |
44 */ |
45 */ |
45 final class EventDirectoryStream extends AbstractEventStream { |
46 final class EventDirectoryStream extends AbstractEventStream { |
46 private final RepositoryFiles repositoryFiles; |
47 private final RepositoryFiles repositoryFiles; |
47 private final boolean active; |
48 private final boolean active; |
48 private final FileAccess fileAccess; |
49 private final FileAccess fileAccess; |
|
50 |
49 private ChunkParser chunkParser; |
51 private ChunkParser chunkParser; |
50 private long chunkStartNanos; |
52 private long chunkStartNanos; |
51 private RecordedEvent[] sortedList; |
53 private RecordedEvent[] sortedList; |
52 |
54 |
53 EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { |
55 EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { |
74 startAsync(Utils.timeToNanos(Instant.now())); |
76 startAsync(Utils.timeToNanos(Instant.now())); |
75 } |
77 } |
76 |
78 |
77 @Override |
79 @Override |
78 protected void process() throws Exception { |
80 protected void process() throws Exception { |
79 StreamConfiguration c = configuration; |
81 Dispatcher disp = dispatcher(); |
|
82 |
80 Path path; |
83 Path path; |
81 boolean validStartTime = active || c.getStartTime() != null; |
84 boolean validStartTime = active || disp.startTime != null; |
82 if (validStartTime) { |
85 if (validStartTime) { |
83 path = repositoryFiles.firstPath(c.getStartNanos()); |
86 path = repositoryFiles.firstPath(disp.startNanos); |
84 } else { |
87 } else { |
85 path = repositoryFiles.lastPath(); |
88 path = repositoryFiles.lastPath(); |
86 } |
89 } |
87 if (path == null) { // closed |
90 if (path == null) { // closed |
88 return; |
91 return; |
89 } |
92 } |
90 chunkStartNanos = repositoryFiles.getTimestamp(path); |
93 chunkStartNanos = repositoryFiles.getTimestamp(path); |
91 try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { |
94 try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { |
92 chunkParser = new ChunkParser(input, c.getReuse()); |
95 chunkParser = new ChunkParser(input, disp.parserConfiguration); |
93 long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); |
96 long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); |
94 long filtertStart = validStartTime ? c.getStartNanos() : segmentStart; |
97 long filterStart = validStartTime ? disp.startNanos : segmentStart; |
95 long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE; |
98 long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; |
|
99 |
96 while (!isClosed()) { |
100 while (!isClosed()) { |
97 boolean awaitnewEvent = false; |
101 boolean awaitnewEvent = false; |
98 while (!isClosed() && !chunkParser.isChunkFinished()) { |
102 while (!isClosed() && !chunkParser.isChunkFinished()) { |
99 c = configuration; |
103 disp = dispatcher(); |
100 boolean ordered = c.getOrdered(); |
104 ParserConfiguration pc = disp.parserConfiguration; |
|
105 pc.filterStart = filterStart; |
|
106 pc.filterEnd = filterEnd; |
|
107 chunkParser.updateConfiguration(pc, true); |
101 chunkParser.setFlushOperation(getFlushOperation()); |
108 chunkParser.setFlushOperation(getFlushOperation()); |
102 chunkParser.setReuse(c.getReuse()); |
109 if (pc.ordered) { |
103 chunkParser.setOrdered(ordered); |
110 awaitnewEvent = processOrdered(disp, awaitnewEvent); |
104 chunkParser.setFilterStart(filtertStart); |
|
105 chunkParser.setFilterEnd(filterEnd); |
|
106 chunkParser.resetEventCache(); |
|
107 chunkParser.setParserFilter(c.getFilter()); |
|
108 chunkParser.updateEventParsers(); |
|
109 c.clearDispatchCache(); |
|
110 if (ordered) { |
|
111 awaitnewEvent = processOrdered(c, awaitnewEvent); |
|
112 } else { |
111 } else { |
113 awaitnewEvent = processUnordered(c, awaitnewEvent); |
112 awaitnewEvent = processUnordered(disp, awaitnewEvent); |
114 } |
113 } |
115 if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) { |
114 if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) { |
116 close(); |
115 close(); |
117 return; |
116 return; |
118 } |
117 } |
162 // at least 2 events, sort them |
161 // at least 2 events, sort them |
163 if (index > 1) { |
162 if (index > 1) { |
164 Arrays.sort(sortedList, 0, index, END_TIME); |
163 Arrays.sort(sortedList, 0, index, END_TIME); |
165 } |
164 } |
166 for (int i = 0; i < index; i++) { |
165 for (int i = 0; i < index; i++) { |
167 dispatch(c, sortedList[i]); |
166 c.dispatch(sortedList[i]); |
168 } |
167 } |
169 return awaitNewEvents; |
168 return awaitNewEvents; |
170 } |
169 } |
171 |
170 |
172 private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException { |
171 private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException { |
173 while (true) { |
172 while (true) { |
174 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
173 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); |
175 if (e == null) { |
174 if (e == null) { |
176 return true; |
175 return true; |
177 } else { |
176 } else { |
178 dispatch(c, e); |
177 c.dispatch(e); |
179 } |
178 } |
180 } |
179 } |
181 } |
180 } |
182 } |
181 } |