66 public void process() throws IOException { |
66 public void process() throws IOException { |
67 StreamConfiguration c1 = configuration; |
67 StreamConfiguration c1 = configuration; |
68 chunkStartNanos = c1.getStartNanos(); |
68 chunkStartNanos = c1.getStartNanos(); |
69 Path path; |
69 Path path; |
70 if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) { |
70 if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) { |
71 // TODO: Need to skip forward to the next event |
71 // TODO: Need to wait for next segment to arrive and then |
72 // For now, use the last chunk. |
72 // use first event, but this will do for. |
73 path = repositoryFiles.lastPath(); |
73 path = repositoryFiles.lastPath(); |
74 } else { |
74 } else { |
75 path = repositoryFiles.nextPath(chunkStartNanos); |
75 path = repositoryFiles.firstPath(chunkStartNanos); |
76 } |
76 } |
77 if (path == null) { // closed |
77 if (path == null) { // closed |
78 return; |
78 return; |
79 } |
79 } |
80 chunkStartNanos = repositoryFiles.getTimestamp(path) + 1; |
80 chunkStartNanos = repositoryFiles.getTimestamp(path); |
81 try (RecordingInput input = new RecordingInput(path.toFile())) { |
81 try (RecordingInput input = new RecordingInput(path.toFile())) { |
82 chunkParser = new ChunkParser(input, c1.getReuse()); |
82 chunkParser = new ChunkParser(input, c1.getReuse()); |
83 while (!isClosed()) { |
83 while (!isClosed()) { |
84 boolean awaitnewEvent = false; |
84 boolean awaitnewEvent = false; |
85 while (!isClosed() && !chunkParser.isChunkFinished()) { |
85 while (!isClosed() && !chunkParser.isChunkFinished()) { |