src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
branchJEP-349-branch
changeset 58129 7b751fe181a5
parent 58020 f082177c5023
equal deleted inserted replaced
58121:6f8f18ac1d54 58129:7b751fe181a5
    60     }
    60     }
    61 
    61 
    62     @Override
    62     @Override
    63     public void close() {
    63     public void close() {
    64         setClosed(true);
    64         setClosed(true);
    65         runCloseActions();
    65         dispatcher().runCloseActions();
    66         try {
    66         try {
    67             input.close();
    67             input.close();
    68         } catch (IOException e) {
    68         } catch (IOException e) {
    69             // ignore
    69             // ignore
    70         }
    70         }
    71     }
    71     }
    72 
    72 
    73     @Override
    73     @Override
    74     protected void process() throws IOException {
    74     protected void process() throws IOException {
    75         StreamConfiguration c = configuration;
    75         Dispatcher disp = dispatcher();
    76         long start = 0;
    76         long start = 0;
    77         long end = Long.MAX_VALUE;
    77         long end = Long.MAX_VALUE;
    78         if (c.getStartTime() != null) {
    78         if (disp.startTime != null) {
    79             start = c.getStartNanos();
    79             start = disp.startNanos;
    80         }
    80         }
    81         if (c.getEndTime() != null) {
    81         if (disp.endTime != null) {
    82             end = c.getEndNanos();
    82             end = disp.endNanos;
    83         }
    83         }
    84 
    84 
    85         chunkParser = new ChunkParser(input, c.getReuse());
    85         chunkParser = new ChunkParser(input, disp.parserConfiguration);
    86         while (!isClosed()) {
    86         while (!isClosed()) {
    87             if (chunkParser.getStartNanos() > end) {
    87             if (chunkParser.getStartNanos() > end) {
    88                 close();
    88                 close();
    89                 return;
    89                 return;
    90             }
    90             }
    91             c = configuration;
    91             disp = dispatcher();
    92             boolean ordered = c.getOrdered();
    92             disp.parserConfiguration.filterStart = start;
       
    93             disp.parserConfiguration.filterEnd = end;
       
    94             chunkParser.updateConfiguration(disp.parserConfiguration, true);
    93             chunkParser.setFlushOperation(getFlushOperation());
    95             chunkParser.setFlushOperation(getFlushOperation());
    94             chunkParser.setFilterStart(start);
    96             if (disp.parserConfiguration.ordered) {
    95             chunkParser.setFilterEnd(end);
    97                 processOrdered(disp);
    96             chunkParser.setReuse(c.getReuse());
       
    97             chunkParser.setOrdered(ordered);
       
    98             chunkParser.resetEventCache();
       
    99             chunkParser.setParserFilter(c.getFiler());
       
   100             chunkParser.updateEventParsers();
       
   101             c.clearDispatchCache();
       
   102             if (ordered) {
       
   103                 processOrdered(c);
       
   104             } else {
    98             } else {
   105                 processUnordered(c);
    99                 processUnordered(disp);
   106             }
   100             }
   107             if (chunkParser.isLastChunk()) {
   101             if (isClosed() || chunkParser.isLastChunk()) {
   108                 return;
   102                 return;
   109             }
   103             }
   110             chunkParser = chunkParser.nextChunkParser();
   104             chunkParser = chunkParser.nextChunkParser();
   111         }
   105         }
   112     }
   106     }
   113 
   107 
   114     private void processOrdered(StreamConfiguration c) throws IOException {
   108     private void processOrdered(Dispatcher c) throws IOException {
   115         if (sortedList == null) {
   109         if (sortedList == null) {
   116             sortedList = new RecordedEvent[10_000];
   110             sortedList = new RecordedEvent[10_000];
   117         }
   111         }
   118         RecordedEvent event;
   112         RecordedEvent event;
   119         int index = 0;
   113         int index = 0;
   120         while (true) {
   114         while (true) {
   121             event = chunkParser.readEvent();
   115             event = chunkParser.readEvent();
   122             if (event == null) {
   116             if (event == null) {
   123                 Arrays.sort(sortedList, 0, index, END_TIME);
   117                 Arrays.sort(sortedList, 0, index, END_TIME);
   124                 for (int i = 0; i < index; i++) {
   118                 for (int i = 0; i < index; i++) {
   125                     dispatch(c, sortedList[i]);
   119                     c.dispatch(sortedList[i]);
   126                 }
   120                 }
   127                 return;
   121                 return;
   128             }
   122             }
   129             if (index == sortedList.length) {
   123             if (index == sortedList.length) {
   130                 RecordedEvent[] tmp = sortedList;
   124                 RecordedEvent[] tmp = sortedList;
   133             }
   127             }
   134             sortedList[index++] = event;
   128             sortedList[index++] = event;
   135         }
   129         }
   136     }
   130     }
   137 
   131 
   138     private void processUnordered(StreamConfiguration c) throws IOException {
   132     private void processUnordered(Dispatcher c) throws IOException {
   139         while (!isClosed()) {
   133         while (!isClosed()) {
   140             RecordedEvent event = chunkParser.readEvent();
   134             RecordedEvent event = chunkParser.readEvent();
   141             if (event == null) {
   135             if (event == null) {
   142                 return;
   136                 return;
   143             }
   137             }
   144             dispatch(c, event);
   138             c.dispatch(event);
   145         }
   139         }
   146     }
   140     }
   147 }
   141 }