src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57640 46a77fccd251
parent 57638 3b41affae2d2
child 57690 9316d02dd4a5
equal deleted inserted replaced
57638:3b41affae2d2 57640:46a77fccd251
    66         public void process() throws IOException {
    66         public void process() throws IOException {
    67             StreamConfiguration c1 = configuration;
    67             StreamConfiguration c1 = configuration;
    68             chunkStartNanos = c1.getStartNanos();
    68             chunkStartNanos = c1.getStartNanos();
    69             Path path;
    69             Path path;
    70             if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
    70             if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
    71                 // TODO: Need to skip forward to the next event
    71                 // TODO: Need to wait for next segment to arrive and then
    72                 // For now, use the last chunk.
    72                 // use first event, but this will do for.
    73                 path = repositoryFiles.lastPath();
    73                 path = repositoryFiles.lastPath();
    74             } else {
    74             } else {
    75                 path = repositoryFiles.nextPath(chunkStartNanos);
    75                 path = repositoryFiles.firstPath(chunkStartNanos);
    76             }
    76             }
    77             if (path == null) { // closed
    77             if (path == null) { // closed
    78                 return;
    78                 return;
    79             }
    79             }
    80             chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
    80             chunkStartNanos = repositoryFiles.getTimestamp(path);
    81             try (RecordingInput input = new RecordingInput(path.toFile())) {
    81             try (RecordingInput input = new RecordingInput(path.toFile())) {
    82                 chunkParser = new ChunkParser(input, c1.getReuse());
    82                 chunkParser = new ChunkParser(input, c1.getReuse());
    83                 while (!isClosed()) {
    83                 while (!isClosed()) {
    84                     boolean awaitnewEvent = false;
    84                     boolean awaitnewEvent = false;
    85                     while (!isClosed() && !chunkParser.isChunkFinished()) {
    85                     while (!isClosed() && !chunkParser.isChunkFinished()) {
   100                         runFlushActions();
   100                         runFlushActions();
   101                     }
   101                     }
   102                     if (isClosed()) {
   102                     if (isClosed()) {
   103                         return;
   103                         return;
   104                     }
   104                     }
   105                     path = repositoryFiles.nextPath(chunkStartNanos);
   105                     long durationNanos = chunkParser.getChunkDuration();
       
   106                     path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
   106                     if (path == null) {
   107                     if (path == null) {
   107                         return; // stream closed
   108                         return; // stream closed
   108                     }
   109                     }
   109                     chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
   110                     chunkStartNanos = repositoryFiles.getTimestamp(path);
   110                     input.setFile(path);
   111                     input.setFile(path);
   111                     chunkParser = chunkParser.newChunkParser();
   112                     chunkParser = chunkParser.newChunkParser();
   112                 }
   113                 }
   113             }
   114             }
   114         }
   115         }