src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
changeset 59327 2c3578aa0bdf
parent 59310 72f3dd43dd28
equal deleted inserted replaced
59326:851a389fc54d 59327:2c3578aa0bdf
   103             }
   103             }
   104         }
   104         }
   105     }
   105     }
   106 
   106 
   107     protected void processRecursionSafe() throws IOException {
   107     protected void processRecursionSafe() throws IOException {
       
   108         Dispatcher lastDisp = null;
   108         Dispatcher disp = dispatcher();
   109         Dispatcher disp = dispatcher();
   109 
       
   110         Path path;
   110         Path path;
   111         boolean validStartTime = recording != null || disp.startTime != null;
   111         boolean validStartTime = recording != null || disp.startTime != null;
   112         if (validStartTime) {
   112         if (validStartTime) {
   113             path = repositoryFiles.firstPath(disp.startNanos);
   113             path = repositoryFiles.firstPath(disp.startNanos);
   114         } else {
   114         } else {
   123             long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
   123             long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
   124             long filterStart = validStartTime ? disp.startNanos : segmentStart;
   124             long filterStart = validStartTime ? disp.startNanos : segmentStart;
   125             long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
   125             long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
   126 
   126 
   127             while (!isClosed()) {
   127             while (!isClosed()) {
   128                 boolean awaitnewEvent = false;
       
   129                 while (!isClosed() && !currentParser.isChunkFinished()) {
   128                 while (!isClosed() && !currentParser.isChunkFinished()) {
   130                     disp = dispatcher();
   129                     disp = dispatcher();
   131                     ParserConfiguration pc = disp.parserConfiguration;
   130                     if (disp != lastDisp) {
   132                     pc.filterStart = filterStart;
   131                         ParserConfiguration pc = disp.parserConfiguration;
   133                     pc.filterEnd = filterEnd;
   132                         pc.filterStart = filterStart;
   134                     currentParser.updateConfiguration(pc, true);
   133                         pc.filterEnd = filterEnd;
   135                     currentParser.setFlushOperation(getFlushOperation());
   134                         currentParser.updateConfiguration(pc, true);
   136                     if (pc.isOrdered()) {
   135                         currentParser.setFlushOperation(getFlushOperation());
   137                         awaitnewEvent = processOrdered(disp, awaitnewEvent);
   136                         lastDisp = disp;
       
   137                     }
       
   138                     if (disp.parserConfiguration.isOrdered()) {
       
   139                         processOrdered(disp);
   138                     } else {
   140                     } else {
   139                         awaitnewEvent = processUnordered(disp, awaitnewEvent);
   141                         processUnordered(disp);
   140                     }
   142                     }
   141                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
   143                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
   142                         close();
   144                         close();
   143                         return;
   145                         return;
   144                     }
   146                     }
   180             return false;
   182             return false;
   181         }
   183         }
   182         return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
   184         return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
   183     }
   185     }
   184 
   186 
   185     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   187     private void processOrdered(Dispatcher c) throws IOException {
   186         if (sortedCache == null) {
   188         if (sortedCache == null) {
   187             sortedCache = new RecordedEvent[100_000];
   189             sortedCache = new RecordedEvent[100_000];
   188         }
   190         }
   189         int index = 0;
   191         int index = 0;
   190         while (true) {
   192         while (true) {
   191             RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
   193             RecordedEvent e = currentParser.readStreamingEvent();
   192             if (e == null) {
   194             if (e == null) {
   193                 // wait for new event with next call to
       
   194                 // readStreamingEvent()
       
   195                 awaitNewEvents = true;
       
   196                 break;
   195                 break;
   197             }
   196             }
   198             awaitNewEvents = false;
       
   199             if (index == sortedCache.length) {
   197             if (index == sortedCache.length) {
   200                 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
   198                 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
   201             }
   199             }
   202             sortedCache[index++] = e;
   200             sortedCache[index++] = e;
   203         }
   201         }
   204 
       
   205         // no events found
   202         // no events found
   206         if (index == 0 && currentParser.isChunkFinished()) {
   203         if (index == 0 && currentParser.isChunkFinished()) {
   207             return awaitNewEvents;
   204             return;
   208         }
   205         }
   209         // at least 2 events, sort them
   206         // at least 2 events, sort them
   210         if (index > 1) {
   207         if (index > 1) {
   211             Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR);
   208             Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR);
   212         }
   209         }
   213         for (int i = 0; i < index; i++) {
   210         for (int i = 0; i < index; i++) {
   214             c.dispatch(sortedCache[i]);
   211             c.dispatch(sortedCache[i]);
   215         }
   212         }
   216         return awaitNewEvents;
   213         return;
   217     }
   214     }
   218 
   215 
   219     private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   216     private boolean processUnordered(Dispatcher c) throws IOException {
   220         while (true) {
   217         while (true) {
   221             RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
   218             RecordedEvent e = currentParser.readStreamingEvent();
   222             if (e == null) {
   219             if (e == null) {
   223                 return true;
   220                 return true;
   224             } else {
   221             } else {
   225                 c.dispatch(e);
   222                 c.dispatch(e);
   226             }
   223             }