src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 58129 7b751fe181a5
parent 58020 f082177c5023
equal deleted inserted replaced
58121:6f8f18ac1d54 58129:7b751fe181a5
    30 import java.security.AccessControlContext;
    30 import java.security.AccessControlContext;
    31 import java.time.Instant;
    31 import java.time.Instant;
    32 import java.util.Arrays;
    32 import java.util.Arrays;
    33 import java.util.Objects;
    33 import java.util.Objects;
    34 
    34 
       
    35 import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
    35 import jdk.jfr.internal.Utils;
    36 import jdk.jfr.internal.Utils;
    36 import jdk.jfr.internal.consumer.FileAccess;
    37 import jdk.jfr.internal.consumer.FileAccess;
    37 import jdk.jfr.internal.consumer.RecordingInput;
    38 import jdk.jfr.internal.consumer.RecordingInput;
    38 import jdk.jfr.internal.consumer.RepositoryFiles;
    39 import jdk.jfr.internal.consumer.RepositoryFiles;
    39 
    40 
    44  */
    45  */
    45 final class EventDirectoryStream extends AbstractEventStream {
    46 final class EventDirectoryStream extends AbstractEventStream {
    46     private final RepositoryFiles repositoryFiles;
    47     private final RepositoryFiles repositoryFiles;
    47     private final boolean active;
    48     private final boolean active;
    48     private final FileAccess fileAccess;
    49     private final FileAccess fileAccess;
       
    50 
    49     private ChunkParser chunkParser;
    51     private ChunkParser chunkParser;
    50     private long chunkStartNanos;
    52     private long chunkStartNanos;
    51     private RecordedEvent[] sortedList;
    53     private RecordedEvent[] sortedList;
    52 
    54 
    53     EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
    55     EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
    58     }
    60     }
    59 
    61 
    60     @Override
    62     @Override
    61     public void close() {
    63     public void close() {
    62         setClosed(true);
    64         setClosed(true);
    63         runCloseActions();
    65         dispatcher().runCloseActions();
    64         repositoryFiles.close();
    66         repositoryFiles.close();
    65     }
    67     }
    66 
    68 
    67     @Override
    69     @Override
    68     public void start() {
    70     public void start() {
    74         startAsync(Utils.timeToNanos(Instant.now()));
    76         startAsync(Utils.timeToNanos(Instant.now()));
    75     }
    77     }
    76 
    78 
    77     @Override
    79     @Override
    78     protected void process() throws Exception {
    80     protected void process() throws Exception {
    79         StreamConfiguration c = configuration;
    81         Dispatcher disp = dispatcher();
       
    82 
    80         Path path;
    83         Path path;
    81         boolean validStartTime = active || c.getStartTime() != null;
    84         boolean validStartTime = active || disp.startTime != null;
    82         if (validStartTime) {
    85         if (validStartTime) {
    83             path = repositoryFiles.firstPath(c.getStartNanos());
    86             path = repositoryFiles.firstPath(disp.startNanos);
    84         } else {
    87         } else {
    85             path = repositoryFiles.lastPath();
    88             path = repositoryFiles.lastPath();
    86         }
    89         }
    87         if (path == null) { // closed
    90         if (path == null) { // closed
    88             return;
    91             return;
    89         }
    92         }
    90         chunkStartNanos = repositoryFiles.getTimestamp(path);
    93         chunkStartNanos = repositoryFiles.getTimestamp(path);
    91         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
    94         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
    92             chunkParser = new ChunkParser(input, c.getReuse());
    95             chunkParser = new ChunkParser(input, disp.parserConfiguration);
    93             long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
    96             long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
    94             long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
    97             long filterStart = validStartTime ? disp.startNanos : segmentStart;
    95             long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
    98             long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
       
    99 
    96             while (!isClosed()) {
   100             while (!isClosed()) {
    97                 boolean awaitnewEvent = false;
   101                 boolean awaitnewEvent = false;
    98                 while (!isClosed() && !chunkParser.isChunkFinished()) {
   102                 while (!isClosed() && !chunkParser.isChunkFinished()) {
    99                     c = configuration;
   103                     disp = dispatcher();
   100                     boolean ordered = c.getOrdered();
   104                     ParserConfiguration pc = disp.parserConfiguration;
       
   105                     pc.filterStart = filterStart;
       
   106                     pc.filterEnd = filterEnd;
       
   107                     chunkParser.updateConfiguration(pc, true);
   101                     chunkParser.setFlushOperation(getFlushOperation());
   108                     chunkParser.setFlushOperation(getFlushOperation());
   102                     chunkParser.setReuse(c.getReuse());
   109                     if (pc.ordered) {
   103                     chunkParser.setOrdered(ordered);
   110                         awaitnewEvent = processOrdered(disp, awaitnewEvent);
   104                     chunkParser.setFilterStart(filtertStart);
       
   105                     chunkParser.setFilterEnd(filterEnd);
       
   106                     chunkParser.resetEventCache();
       
   107                     chunkParser.setParserFilter(c.getFilter());
       
   108                     chunkParser.updateEventParsers();
       
   109                     c.clearDispatchCache();
       
   110                     if (ordered) {
       
   111                         awaitnewEvent = processOrdered(c, awaitnewEvent);
       
   112                     } else {
   111                     } else {
   113                         awaitnewEvent = processUnordered(c, awaitnewEvent);
   112                         awaitnewEvent = processUnordered(disp, awaitnewEvent);
   114                     }
   113                     }
   115                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
   114                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
   116                         close();
   115                         close();
   117                         return;
   116                         return;
   118                     }
   117                     }
   133                 // Could set start = 0;
   132                 // Could set start = 0;
   134             }
   133             }
   135         }
   134         }
   136     }
   135     }
   137 
   136 
   138     private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
   137     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   139         if (sortedList == null) {
   138         if (sortedList == null) {
   140             sortedList = new RecordedEvent[100_000];
   139             sortedList = new RecordedEvent[100_000];
   141         }
   140         }
   142         int index = 0;
   141         int index = 0;
   143         while (true) {
   142         while (true) {
   162         // at least 2 events, sort them
   161         // at least 2 events, sort them
   163         if (index > 1) {
   162         if (index > 1) {
   164             Arrays.sort(sortedList, 0, index, END_TIME);
   163             Arrays.sort(sortedList, 0, index, END_TIME);
   165         }
   164         }
   166         for (int i = 0; i < index; i++) {
   165         for (int i = 0; i < index; i++) {
   167             dispatch(c, sortedList[i]);
   166             c.dispatch(sortedList[i]);
   168         }
   167         }
   169         return awaitNewEvents;
   168         return awaitNewEvents;
   170     }
   169     }
   171 
   170 
   172     private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
   171     private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   173         while (true) {
   172         while (true) {
   174             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
   173             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
   175             if (e == null) {
   174             if (e == null) {
   176                 return true;
   175                 return true;
   177             } else {
   176             } else {
   178                 dispatch(c, e);
   177                 c.dispatch(e);
   179             }
   178             }
   180         }
   179         }
   181     }
   180     }
   182 }
   181 }