60 } |
60 } |
61 |
61 |
62 @Override |
62 @Override |
63 public void close() { |
63 public void close() { |
64 setClosed(true); |
64 setClosed(true); |
65 runCloseActions(); |
65 dispatcher().runCloseActions(); |
66 try { |
66 try { |
67 input.close(); |
67 input.close(); |
68 } catch (IOException e) { |
68 } catch (IOException e) { |
69 // ignore |
69 // ignore |
70 } |
70 } |
71 } |
71 } |
72 |
72 |
73 @Override |
73 @Override |
74 protected void process() throws IOException { |
74 protected void process() throws IOException { |
75 StreamConfiguration c = configuration; |
75 Dispatcher disp = dispatcher(); |
76 long start = 0; |
76 long start = 0; |
77 long end = Long.MAX_VALUE; |
77 long end = Long.MAX_VALUE; |
78 if (c.getStartTime() != null) { |
78 if (disp.startTime != null) { |
79 start = c.getStartNanos(); |
79 start = disp.startNanos; |
80 } |
80 } |
81 if (c.getEndTime() != null) { |
81 if (disp.endTime != null) { |
82 end = c.getEndNanos(); |
82 end = disp.endNanos; |
83 } |
83 } |
84 |
84 |
85 chunkParser = new ChunkParser(input, c.getReuse()); |
85 chunkParser = new ChunkParser(input, disp.parserConfiguration); |
86 while (!isClosed()) { |
86 while (!isClosed()) { |
87 if (chunkParser.getStartNanos() > end) { |
87 if (chunkParser.getStartNanos() > end) { |
88 close(); |
88 close(); |
89 return; |
89 return; |
90 } |
90 } |
91 c = configuration; |
91 disp = dispatcher(); |
92 boolean ordered = c.getOrdered(); |
92 disp.parserConfiguration.filterStart = start; |
|
93 disp.parserConfiguration.filterEnd = end; |
|
94 chunkParser.updateConfiguration(disp.parserConfiguration, true); |
93 chunkParser.setFlushOperation(getFlushOperation()); |
95 chunkParser.setFlushOperation(getFlushOperation()); |
94 chunkParser.setFilterStart(start); |
96 if (disp.parserConfiguration.ordered) { |
95 chunkParser.setFilterEnd(end); |
97 processOrdered(disp); |
96 chunkParser.setReuse(c.getReuse()); |
|
97 chunkParser.setOrdered(ordered); |
|
98 chunkParser.resetEventCache(); |
|
99 chunkParser.setParserFilter(c.getFiler()); |
|
100 chunkParser.updateEventParsers(); |
|
101 c.clearDispatchCache(); |
|
102 if (ordered) { |
|
103 processOrdered(c); |
|
104 } else { |
98 } else { |
105 processUnordered(c); |
99 processUnordered(disp); |
106 } |
100 } |
107 if (chunkParser.isLastChunk()) { |
101 if (isClosed() || chunkParser.isLastChunk()) { |
108 return; |
102 return; |
109 } |
103 } |
110 chunkParser = chunkParser.nextChunkParser(); |
104 chunkParser = chunkParser.nextChunkParser(); |
111 } |
105 } |
112 } |
106 } |
113 |
107 |
114 private void processOrdered(StreamConfiguration c) throws IOException { |
108 private void processOrdered(Dispatcher c) throws IOException { |
115 if (sortedList == null) { |
109 if (sortedList == null) { |
116 sortedList = new RecordedEvent[10_000]; |
110 sortedList = new RecordedEvent[10_000]; |
117 } |
111 } |
118 RecordedEvent event; |
112 RecordedEvent event; |
119 int index = 0; |
113 int index = 0; |
120 while (true) { |
114 while (true) { |
121 event = chunkParser.readEvent(); |
115 event = chunkParser.readEvent(); |
122 if (event == null) { |
116 if (event == null) { |
123 Arrays.sort(sortedList, 0, index, END_TIME); |
117 Arrays.sort(sortedList, 0, index, END_TIME); |
124 for (int i = 0; i < index; i++) { |
118 for (int i = 0; i < index; i++) { |
125 dispatch(c, sortedList[i]); |
119 c.dispatch(sortedList[i]); |
126 } |
120 } |
127 return; |
121 return; |
128 } |
122 } |
129 if (index == sortedList.length) { |
123 if (index == sortedList.length) { |
130 RecordedEvent[] tmp = sortedList; |
124 RecordedEvent[] tmp = sortedList; |