103 } |
103 } |
104 } |
104 } |
105 } |
105 } |
106 |
106 |
107 protected void processRecursionSafe() throws IOException { |
107 protected void processRecursionSafe() throws IOException { |
|
108 Dispatcher lastDisp = null; |
108 Dispatcher disp = dispatcher(); |
109 Dispatcher disp = dispatcher(); |
109 |
|
110 Path path; |
110 Path path; |
111 boolean validStartTime = recording != null || disp.startTime != null; |
111 boolean validStartTime = recording != null || disp.startTime != null; |
112 if (validStartTime) { |
112 if (validStartTime) { |
113 path = repositoryFiles.firstPath(disp.startNanos); |
113 path = repositoryFiles.firstPath(disp.startNanos); |
114 } else { |
114 } else { |
123 long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration(); |
123 long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration(); |
124 long filterStart = validStartTime ? disp.startNanos : segmentStart; |
124 long filterStart = validStartTime ? disp.startNanos : segmentStart; |
125 long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; |
125 long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; |
126 |
126 |
127 while (!isClosed()) { |
127 while (!isClosed()) { |
128 boolean awaitnewEvent = false; |
|
129 while (!isClosed() && !currentParser.isChunkFinished()) { |
128 while (!isClosed() && !currentParser.isChunkFinished()) { |
130 disp = dispatcher(); |
129 disp = dispatcher(); |
131 ParserConfiguration pc = disp.parserConfiguration; |
130 if (disp != lastDisp) { |
132 pc.filterStart = filterStart; |
131 ParserConfiguration pc = disp.parserConfiguration; |
133 pc.filterEnd = filterEnd; |
132 pc.filterStart = filterStart; |
134 currentParser.updateConfiguration(pc, true); |
133 pc.filterEnd = filterEnd; |
135 currentParser.setFlushOperation(getFlushOperation()); |
134 currentParser.updateConfiguration(pc, true); |
136 if (pc.isOrdered()) { |
135 currentParser.setFlushOperation(getFlushOperation()); |
137 awaitnewEvent = processOrdered(disp, awaitnewEvent); |
136 lastDisp = disp; |
|
137 } |
|
138 if (disp.parserConfiguration.isOrdered()) { |
|
139 processOrdered(disp); |
138 } else { |
140 } else { |
139 awaitnewEvent = processUnordered(disp, awaitnewEvent); |
141 processUnordered(disp); |
140 } |
142 } |
141 if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { |
143 if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { |
142 close(); |
144 close(); |
143 return; |
145 return; |
144 } |
146 } |
180 return false; |
182 return false; |
181 } |
183 } |
182 return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); |
184 return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); |
183 } |
185 } |
184 |
186 |
185 private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { |
187 private void processOrdered(Dispatcher c) throws IOException { |
186 if (sortedCache == null) { |
188 if (sortedCache == null) { |
187 sortedCache = new RecordedEvent[100_000]; |
189 sortedCache = new RecordedEvent[100_000]; |
188 } |
190 } |
189 int index = 0; |
191 int index = 0; |
190 while (true) { |
192 while (true) { |
191 RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); |
193 RecordedEvent e = currentParser.readStreamingEvent(); |
192 if (e == null) { |
194 if (e == null) { |
193 // wait for new event with next call to |
|
194 // readStreamingEvent() |
|
195 awaitNewEvents = true; |
|
196 break; |
195 break; |
197 } |
196 } |
198 awaitNewEvents = false; |
|
199 if (index == sortedCache.length) { |
197 if (index == sortedCache.length) { |
200 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2); |
198 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2); |
201 } |
199 } |
202 sortedCache[index++] = e; |
200 sortedCache[index++] = e; |
203 } |
201 } |
204 |
|
205 // no events found |
202 // no events found |
206 if (index == 0 && currentParser.isChunkFinished()) { |
203 if (index == 0 && currentParser.isChunkFinished()) { |
207 return awaitNewEvents; |
204 return; |
208 } |
205 } |
209 // at least 2 events, sort them |
206 // at least 2 events, sort them |
210 if (index > 1) { |
207 if (index > 1) { |
211 Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR); |
208 Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR); |
212 } |
209 } |
213 for (int i = 0; i < index; i++) { |
210 for (int i = 0; i < index; i++) { |
214 c.dispatch(sortedCache[i]); |
211 c.dispatch(sortedCache[i]); |
215 } |
212 } |
216 return awaitNewEvents; |
213 return; |
217 } |
214 } |
218 |
215 |
219 private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException { |
216 private boolean processUnordered(Dispatcher c) throws IOException { |
220 while (true) { |
217 while (true) { |
221 RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); |
218 RecordedEvent e = currentParser.readStreamingEvent(); |
222 if (e == null) { |
219 if (e == null) { |
223 return true; |
220 return true; |
224 } else { |
221 } else { |
225 c.dispatch(e); |
222 c.dispatch(e); |
226 } |
223 } |