src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57449 099789ceff7d
parent 57433 83e4343a6984
child 57452 6fabe73e5d9a
equal deleted inserted replaced
57434:216bf2e3b542 57449:099789ceff7d
    33 import java.util.Arrays;
    33 import java.util.Arrays;
    34 import java.util.Comparator;
    34 import java.util.Comparator;
    35 import java.util.Objects;
    35 import java.util.Objects;
    36 import java.util.function.Consumer;
    36 import java.util.function.Consumer;
    37 
    37 
    38 import jdk.jfr.internal.consumer.EventConsumer;
       
    39 import jdk.jfr.internal.consumer.RecordingInput;
    38 import jdk.jfr.internal.consumer.RecordingInput;
    40 import jdk.jfr.internal.consumer.RepositoryFiles;
    39 import jdk.jfr.internal.consumer.RepositoryFiles;
    41 
    40 
    42 /**
    41 /**
    43  * Implementation of an {@code EventStream}} that operates against a directory
    42  * Implementation of an {@code EventStream}} that operates against a directory
    44  * with chunk files.
    43  * with chunk files.
    45  *
    44  *
    46  */
    45  */
    47 final class EventDirectoryStream implements EventStream {
    46 final class EventDirectoryStream implements EventStream {
    48 
    47 
    49     static final class ParserConsumer extends EventConsumer {
    48     static final class DirectoryConsumer extends EventConsumer {
    50 
    49 
    51         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
    50         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
    52         private static final int DEFAULT_ARRAY_SIZE = 10_000;
    51         private static final int DEFAULT_ARRAY_SIZE = 10_000;
    53         private final RepositoryFiles repositoryFiles;
    52         private final RepositoryFiles repositoryFiles;
    54         private ChunkParser chunkParser;
    53         private ChunkParser chunkParser;
    55         private RecordedEvent[] sortedList;
    54         private RecordedEvent[] sortedList;
    56 
    55         protected long chunkStartNanos;
    57         public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
    56 
       
    57         public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
    58             super(acc);
    58             super(acc);
    59             repositoryFiles = new RepositoryFiles(p);
    59             repositoryFiles = new RepositoryFiles(p);
    60         }
    60         }
    61 
    61 
    62         @Override
    62         @Override
    63         public void process() throws IOException {
    63         public void process() throws IOException {
    64             Path path = repositoryFiles.nextPath(startNanos);
    64             chunkStartNanos = startNanos;
    65             startNanos = repositoryFiles.getTimestamp(path) + 1;
    65             Path path;
       
    66             if (startTime == EventConsumer.NEXT_EVENT) {
       
    67                 // TODO: Need to skip forward to the next event
       
    68                 // For now, use the last chunk.
       
    69                 path = repositoryFiles.lastPath();
       
    70             } else {
       
    71                 path = repositoryFiles.nextPath(chunkStartNanos);
       
    72             }
       
    73             chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
    66             try (RecordingInput input = new RecordingInput(path.toFile())) {
    74             try (RecordingInput input = new RecordingInput(path.toFile())) {
    67                 chunkParser = new ChunkParser(input, this.reuse);
    75                 chunkParser = new ChunkParser(input, this.reuse);
    68                 while (!isClosed()) {
    76                 while (!isClosed()) {
    69                     boolean awaitnewEvent = false;
    77                     boolean awaitnewEvent = false;
    70                     while (!isClosed() && !chunkParser.isChunkFinished()) {
    78                     while (!isClosed() && !chunkParser.isChunkFinished()) {
    79                             awaitnewEvent = processUnordered(awaitnewEvent);
    87                             awaitnewEvent = processUnordered(awaitnewEvent);
    80                         }
    88                         }
    81                         runFlushActions();
    89                         runFlushActions();
    82                     }
    90                     }
    83 
    91 
    84                     path = repositoryFiles.nextPath(startNanos);
    92                     path = repositoryFiles.nextPath(chunkStartNanos);
    85                     if (path == null) {
    93                     if (path == null) {
    86                         return; // stream closed
    94                         return; // stream closed
    87                     }
    95                     }
    88                     startNanos = repositoryFiles.getTimestamp(path) + 1;
    96                     chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
    89                     input.setFile(path);
    97                     input.setFile(path);
    90                     chunkParser = chunkParser.newChunkParser();
    98                     chunkParser = chunkParser.newChunkParser();
    91                 }
    99                 }
    92             }
   100             }
    93         }
   101         }
   145         }
   153         }
   146     }
   154     }
   147 
   155 
   148     private final EventConsumer eventConsumer;
   156     private final EventConsumer eventConsumer;
   149 
   157 
   150     public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
   158     public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
   151         eventConsumer = new ParserConsumer(acc, p);
   159         eventConsumer = new DirectoryConsumer(acc, p);
       
   160         eventConsumer.startTime = startTime;
   152     }
   161     }
   153 
   162 
   154     public void close() {
   163     public void close() {
   155         eventConsumer.close();
   164         eventConsumer.close();
   156     }
   165     }
   221 
   230 
   222     @Override
   231     @Override
   223     public void setOrdered(boolean ordered) {
   232     public void setOrdered(boolean ordered) {
   224         eventConsumer.setOrdered(ordered);
   233         eventConsumer.setOrdered(ordered);
   225     }
   234     }
       
   235 
       
   236     @Override
       
   237     public void setStartTime(Instant startTime) {
       
   238         eventConsumer.setStartTime(startTime);
       
   239     }
   226 }
   240 }