26 package jdk.jfr.consumer; |
26 package jdk.jfr.consumer; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.file.Path; |
29 import java.nio.file.Path; |
30 import java.security.AccessControlContext; |
30 import java.security.AccessControlContext; |
31 import java.security.AccessController; |
|
32 import java.time.Duration; |
|
33 import java.time.Instant; |
|
34 import java.util.Arrays; |
31 import java.util.Arrays; |
35 import java.util.Objects; |
32 import java.util.Objects; |
36 import java.util.function.Consumer; |
|
37 |
33 |
38 import jdk.jfr.internal.consumer.FileAccess; |
34 import jdk.jfr.internal.consumer.FileAccess; |
39 import jdk.jfr.internal.consumer.RecordingInput; |
35 import jdk.jfr.internal.consumer.RecordingInput; |
40 |
36 |
41 /** |
37 /** |
42 * Implementation of an event stream that operates against a recording file. |
38 * Implementation of an event stream that operates against a recording file. |
43 * |
39 * |
44 */ |
40 */ |
45 final class EventFileStream implements EventStream { |
41 final class EventFileStream extends AbstractEventStream { |
|
42 private final RecordingInput input; |
|
43 private ChunkParser chunkParser; |
|
44 private RecordedEvent[] sortedList; |
46 |
45 |
47 private final static class FileStream extends AbstractEventStream { |
46 public EventFileStream(AccessControlContext acc, Path path) throws IOException { |
48 private static final int DEFAULT_ARRAY_SIZE = 100_000; |
47 super(acc, false); |
49 |
|
50 private final RecordingInput input; |
|
51 |
|
52 private ChunkParser chunkParser; |
|
53 private RecordedEvent[] sortedList; |
|
54 |
|
55 public FileStream(AccessControlContext acc, Path path) throws IOException { |
|
56 super(acc, false); |
|
57 this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED); |
|
58 ; } |
|
59 |
|
60 @Override |
|
61 public void process() throws IOException { |
|
62 final StreamConfiguration c1 = configuration; |
|
63 long start = 0; |
|
64 long end = Long.MAX_VALUE; |
|
65 if (c1.getStartTime() != null) { |
|
66 start = c1.getStartNanos(); |
|
67 } |
|
68 if (c1.getEndTime() != null) { |
|
69 end = c1.getEndNanos(); |
|
70 } |
|
71 |
|
72 chunkParser = new ChunkParser(input, c1.getReuse()); |
|
73 while (!isClosed()) { |
|
74 if (chunkParser.getStartNanos() > end) { |
|
75 close(); |
|
76 return; |
|
77 } |
|
78 StreamConfiguration c2 = configuration; |
|
79 boolean ordered = c2.getOrdered(); |
|
80 chunkParser.setFlushOperation(flushOperation); |
|
81 chunkParser.setFirstNanos(start); |
|
82 chunkParser.setLastNanos(end); |
|
83 chunkParser.setReuse(c2.getReuse()); |
|
84 chunkParser.setOrdered(ordered); |
|
85 chunkParser.resetEventCache(); |
|
86 chunkParser.setParserFilter(c2.getFiler()); |
|
87 chunkParser.updateEventParsers(); |
|
88 clearLastDispatch(); |
|
89 if (ordered) { |
|
90 processOrdered(); |
|
91 } else { |
|
92 processUnordered(); |
|
93 } |
|
94 if (chunkParser.isLastChunk()) { |
|
95 return; |
|
96 } |
|
97 chunkParser = chunkParser.nextChunkParser(); |
|
98 } |
|
99 } |
|
100 |
|
101 private void processOrdered() throws IOException { |
|
102 if (sortedList == null) { |
|
103 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; |
|
104 } |
|
105 RecordedEvent event; |
|
106 int index = 0; |
|
107 while (true) { |
|
108 event = chunkParser.readEvent(); |
|
109 if (event == null) { |
|
110 Arrays.sort(sortedList, 0, index, END_TIME); |
|
111 for (int i = 0; i < index; i++) { |
|
112 dispatch(sortedList[i]); |
|
113 } |
|
114 return; |
|
115 } |
|
116 if (index == sortedList.length) { |
|
117 RecordedEvent[] tmp = sortedList; |
|
118 sortedList = new RecordedEvent[2 * tmp.length]; |
|
119 System.arraycopy(tmp, 0, sortedList, 0, tmp.length); |
|
120 } |
|
121 sortedList[index++] = event; |
|
122 } |
|
123 } |
|
124 |
|
125 private void processUnordered() throws IOException { |
|
126 RecordedEvent event; |
|
127 while (!isClosed()) { |
|
128 event = chunkParser.readEvent(); |
|
129 if (event == null) { |
|
130 return; |
|
131 } |
|
132 dispatch(event); |
|
133 } |
|
134 } |
|
135 |
|
136 @Override |
|
137 public void close() { |
|
138 setClosed(true);; |
|
139 runCloseActions(); |
|
140 try { |
|
141 input.close(); |
|
142 } catch (IOException e) { |
|
143 // ignore |
|
144 } |
|
145 } |
|
146 } |
|
147 |
|
148 private final FileStream eventStream; |
|
149 |
|
150 public EventFileStream(Path path, Instant from, Instant to) throws IOException { |
|
151 Objects.requireNonNull(path); |
48 Objects.requireNonNull(path); |
152 eventStream = new FileStream(AccessController.getContext(), path); |
49 this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED); |
153 } |
50 } |
154 |
51 |
155 @Override |
52 @Override |
156 public void onEvent(Consumer<RecordedEvent> action) { |
53 public void start() { |
157 Objects.requireNonNull(action); |
54 start(0); |
158 eventStream.onEvent(action); |
|
159 } |
55 } |
160 |
56 |
161 @Override |
57 @Override |
162 public void onEvent(String eventName, Consumer<RecordedEvent> action) { |
58 public void startAsync() { |
163 Objects.requireNonNull(eventName); |
59 startAsync(0); |
164 Objects.requireNonNull(action); |
|
165 eventStream.onEvent(eventName, action); |
|
166 } |
|
167 |
|
168 @Override |
|
169 public void onFlush(Runnable action) { |
|
170 Objects.requireNonNull(action); |
|
171 eventStream.onFlush(action); |
|
172 } |
|
173 |
|
174 @Override |
|
175 public void onClose(Runnable action) { |
|
176 Objects.requireNonNull(action); |
|
177 eventStream.addCloseAction(action); |
|
178 } |
60 } |
179 |
61 |
180 @Override |
62 @Override |
181 public void close() { |
63 public void close() { |
182 eventStream.close(); |
64 setClosed(true); |
|
65 runCloseActions(); |
|
66 try { |
|
67 input.close(); |
|
68 } catch (IOException e) { |
|
69 // ignore |
|
70 } |
183 } |
71 } |
184 |
72 |
185 @Override |
73 @Override |
186 public boolean remove(Object action) { |
74 public void process() throws IOException { |
187 Objects.requireNonNull(action); |
75 StreamConfiguration c = configuration; |
188 return eventStream.remove(action); |
76 long start = 0; |
|
77 long end = Long.MAX_VALUE; |
|
78 if (c.getStartTime() != null) { |
|
79 start = c.getStartNanos(); |
|
80 } |
|
81 if (c.getEndTime() != null) { |
|
82 end = c.getEndNanos(); |
|
83 } |
|
84 |
|
85 chunkParser = new ChunkParser(input, c.getReuse()); |
|
86 while (!isClosed()) { |
|
87 if (chunkParser.getStartNanos() > end) { |
|
88 close(); |
|
89 return; |
|
90 } |
|
91 c = configuration; |
|
92 boolean ordered = c.getOrdered(); |
|
93 chunkParser.setFlushOperation(getFlushOperation()); |
|
94 chunkParser.setFilterStart(start); |
|
95 chunkParser.setFilterEnd(end); |
|
96 chunkParser.setReuse(c.getReuse()); |
|
97 chunkParser.setOrdered(ordered); |
|
98 chunkParser.resetEventCache(); |
|
99 chunkParser.setParserFilter(c.getFiler()); |
|
100 chunkParser.updateEventParsers(); |
|
101 clearLastDispatch(); |
|
102 if (ordered) { |
|
103 processOrdered(); |
|
104 } else { |
|
105 processUnordered(); |
|
106 } |
|
107 if (chunkParser.isLastChunk()) { |
|
108 return; |
|
109 } |
|
110 chunkParser = chunkParser.nextChunkParser(); |
|
111 } |
189 } |
112 } |
190 |
113 |
191 @Override |
114 private void processOrdered() throws IOException { |
192 public void start() { |
115 if (sortedList == null) { |
193 eventStream.start(0); |
116 sortedList = new RecordedEvent[10_000]; |
|
117 } |
|
118 RecordedEvent event; |
|
119 int index = 0; |
|
120 while (true) { |
|
121 event = chunkParser.readEvent(); |
|
122 if (event == null) { |
|
123 Arrays.sort(sortedList, 0, index, END_TIME); |
|
124 for (int i = 0; i < index; i++) { |
|
125 dispatch(sortedList[i]); |
|
126 } |
|
127 return; |
|
128 } |
|
129 if (index == sortedList.length) { |
|
130 RecordedEvent[] tmp = sortedList; |
|
131 sortedList = new RecordedEvent[2 * tmp.length]; |
|
132 System.arraycopy(tmp, 0, sortedList, 0, tmp.length); |
|
133 } |
|
134 sortedList[index++] = event; |
|
135 } |
194 } |
136 } |
195 |
137 |
196 @Override |
138 private void processUnordered() throws IOException { |
197 public void setReuse(boolean reuse) { |
139 while (!isClosed()) { |
198 eventStream.setReuse(reuse); |
140 RecordedEvent event = chunkParser.readEvent(); |
199 } |
141 if (event == null) { |
200 |
142 return; |
201 @Override |
143 } |
202 public void startAsync() { |
144 dispatch(event); |
203 eventStream.startAsync(0); |
145 } |
204 } |
|
205 |
|
206 @Override |
|
207 public void awaitTermination(Duration timeout) { |
|
208 Objects.requireNonNull(timeout); |
|
209 eventStream.awaitTermination(timeout); |
|
210 } |
|
211 |
|
212 @Override |
|
213 public void awaitTermination() { |
|
214 eventStream.awaitTermination(); |
|
215 } |
|
216 |
|
217 @Override |
|
218 public void setOrdered(boolean ordered) { |
|
219 eventStream.setOrdered(ordered); |
|
220 } |
|
221 |
|
222 @Override |
|
223 public void setStartTime(Instant startTime) { |
|
224 eventStream.setStartTime(startTime); |
|
225 } |
|
226 |
|
227 @Override |
|
228 public void setEndTime(Instant endTime) { |
|
229 eventStream.setEndTime(endTime); |
|
230 } |
|
231 |
|
232 @Override |
|
233 public void onError(Consumer<Throwable> action) { |
|
234 // TODO Auto-generated method stub |
|
235 |
|
236 } |
146 } |
237 } |
147 } |