src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
changeset 59226 a0f39cc47387
parent 58863 c16ac7a2eba4
child 59310 72f3dd43dd28
equal deleted inserted replaced
59225:80e1201f6c9a 59226:a0f39cc47387
    33 import java.util.Comparator;
    33 import java.util.Comparator;
    34 import java.util.Objects;
    34 import java.util.Objects;
    35 
    35 
    36 import jdk.jfr.consumer.RecordedEvent;
    36 import jdk.jfr.consumer.RecordedEvent;
    37 import jdk.jfr.internal.JVM;
    37 import jdk.jfr.internal.JVM;
       
    38 import jdk.jfr.internal.PlatformRecording;
    38 import jdk.jfr.internal.Utils;
    39 import jdk.jfr.internal.Utils;
    39 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
    40 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
    40 
    41 
    41 /**
    42 /**
    42  * Implementation of an {@code EventStream}} that operates against a directory
    43  * Implementation of an {@code EventStream}} that operates against a directory
    43  * with chunk files.
    44  * with chunk files.
    44  *
    45  *
    45  */
    46  */
    46 public final class EventDirectoryStream extends AbstractEventStream {
    47 public class EventDirectoryStream extends AbstractEventStream {
    47 
    48 
    48     private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
    49     private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
    49 
    50 
    50     private final RepositoryFiles repositoryFiles;
    51     private final RepositoryFiles repositoryFiles;
    51     private final boolean active;
    52     private final PlatformRecording recording;
    52     private final FileAccess fileAccess;
    53     private final FileAccess fileAccess;
    53 
    54 
    54     private ChunkParser currentParser;
    55     private ChunkParser currentParser;
    55     private long currentChunkStartNanos;
    56     private long currentChunkStartNanos;
    56     private RecordedEvent[] sortedCache;
    57     private RecordedEvent[] sortedCache;
    57     private int threadExclusionLevel = 0;
    58     private int threadExclusionLevel = 0;
    58 
    59 
    59     public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
    60     public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException {
    60         super(acc, active);
    61         super(acc, recording);
    61         this.fileAccess = Objects.requireNonNull(fileAccess);
    62         this.fileAccess = Objects.requireNonNull(fileAccess);
    62         this.active = active;
    63         this.recording = recording;
    63         this.repositoryFiles = new RepositoryFiles(fileAccess, p);
    64         this.repositoryFiles = new RepositoryFiles(fileAccess, p);
    64     }
    65     }
    65 
    66 
    66     @Override
    67     @Override
    67     public void close() {
    68     public void close() {
   102 
   103 
   103     protected void processRecursionSafe() throws IOException {
   104     protected void processRecursionSafe() throws IOException {
   104         Dispatcher disp = dispatcher();
   105         Dispatcher disp = dispatcher();
   105 
   106 
   106         Path path;
   107         Path path;
   107         boolean validStartTime = active || disp.startTime != null;
   108         boolean validStartTime = recording != null || disp.startTime != null;
   108         if (validStartTime) {
   109         if (validStartTime) {
   109             path = repositoryFiles.firstPath(disp.startNanos);
   110             path = repositoryFiles.firstPath(disp.startNanos);
   110         } else {
   111         } else {
   111             path = repositoryFiles.lastPath();
   112             path = repositoryFiles.lastPath();
   112         }
   113         }
   137                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
   138                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
   138                         close();
   139                         close();
   139                         return;
   140                         return;
   140                     }
   141                     }
   141                 }
   142                 }
   142 
   143                 if (isLastChunk()) {
       
   144                     // Recording was stopped/closed externally, and no more data to process.
       
   145                     return;
       
   146                 }
       
   147 
       
   148                 if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) {
       
   149                     // JVM process exited/crashed, or repository migrated to an unknown location
       
   150                     return;
       
   151                 }
   143                 if (isClosed()) {
   152                 if (isClosed()) {
       
   153                     // Stream was closed
   144                     return;
   154                     return;
   145                 }
   155                 }
   146                 long durationNanos = currentParser.getChunkDuration();
   156                 long durationNanos = currentParser.getChunkDuration();
   147                 if (durationNanos == 0) {
   157                 if (durationNanos == 0) {
   148                     // Avoid reading the same chunk again and again if
   158                     // Avoid reading the same chunk again and again if
   158                 currentParser = currentParser.newChunkParser();
   168                 currentParser = currentParser.newChunkParser();
   159                 // TODO: Optimization. No need filter when we reach new chunk
   169                 // TODO: Optimization. No need filter when we reach new chunk
   160                 // Could set start = 0;
   170                 // Could set start = 0;
   161             }
   171             }
   162         }
   172         }
       
   173     }
       
   174 
       
   175     private boolean isLastChunk() {
       
   176         if (recording == null) {
       
   177             return false;
       
   178         }
       
   179         return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
   163     }
   180     }
   164 
   181 
   165     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   182     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
   166         if (sortedCache == null) {
   183         if (sortedCache == null) {
   167             sortedCache = new RecordedEvent[100_000];
   184             sortedCache = new RecordedEvent[100_000];
   204             } else {
   221             } else {
   205                 c.dispatch(e);
   222                 c.dispatch(e);
   206             }
   223             }
   207         }
   224         }
   208     }
   225     }
       
   226 
   209 }
   227 }