src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57985 be121cbf3284
parent 57971 aa7b1ea52413
child 58020 f082177c5023
equal deleted inserted replaced
57984:269bbe414580 57985:be121cbf3284
    26 package jdk.jfr.consumer;
    26 package jdk.jfr.consumer;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.file.Path;
    29 import java.nio.file.Path;
    30 import java.security.AccessControlContext;
    30 import java.security.AccessControlContext;
    31 import java.time.Duration;
       
    32 import java.time.Instant;
    31 import java.time.Instant;
    33 import java.util.Arrays;
    32 import java.util.Arrays;
    34 import java.util.Comparator;
       
    35 import java.util.Objects;
    33 import java.util.Objects;
    36 import java.util.function.Consumer;
       
    37 
    34 
    38 import jdk.jfr.internal.Utils;
    35 import jdk.jfr.internal.Utils;
    39 import jdk.jfr.internal.consumer.FileAccess;
    36 import jdk.jfr.internal.consumer.FileAccess;
    40 import jdk.jfr.internal.consumer.RecordingInput;
    37 import jdk.jfr.internal.consumer.RecordingInput;
    41 import jdk.jfr.internal.consumer.RepositoryFiles;
    38 import jdk.jfr.internal.consumer.RepositoryFiles;
    43 /**
    40 /**
    44  * Implementation of an {@code EventStream}} that operates against a directory
    41  * Implementation of an {@code EventStream}} that operates against a directory
    45  * with chunk files.
    42  * with chunk files.
    46  *
    43  *
    47  */
    44  */
    48 class EventDirectoryStream implements EventStream {
    45 final class EventDirectoryStream extends AbstractEventStream {
       
    46     private final RepositoryFiles repositoryFiles;
       
    47     private final boolean active;
       
    48     private final FileAccess fileAccess;
       
    49     private ChunkParser chunkParser;
       
    50     private long chunkStartNanos;
       
    51     private RecordedEvent[] sortedList;
    49 
    52 
    50     static final class DirectoryStream extends AbstractEventStream {
    53     public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
    51 
    54         super(acc, active);
    52         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
    55         this.fileAccess = Objects.requireNonNull(fileAccess);
    53         private static final int DEFAULT_ARRAY_SIZE = 10_000;
    56         this.active = active;
    54 
    57         this.repositoryFiles = new RepositoryFiles(fileAccess, p);
    55         private final RepositoryFiles repositoryFiles;
       
    56         private final boolean active;
       
    57         private final FileAccess fileAccess;
       
    58         private ChunkParser chunkParser;
       
    59         private RecordedEvent[] sortedList;
       
    60         protected long chunkStartNanos;
       
    61 
       
    62         public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
       
    63             super(acc, active);
       
    64             this.fileAccess = fileAccess;
       
    65             this.active = active;
       
    66             repositoryFiles = new RepositoryFiles(fileAccess, p);
       
    67         }
       
    68 
       
    69         @Override
       
    70         public void process() throws Exception {
       
    71             final StreamConfiguration c1 = configuration;
       
    72             Path path;
       
    73             boolean validStartTime = active || c1.getStartTime() != null;
       
    74             if (validStartTime) {
       
    75                 path = repositoryFiles.firstPath(c1.getStartNanos());
       
    76             } else {
       
    77                 path = repositoryFiles.lastPath();
       
    78             }
       
    79             if (path == null) { // closed
       
    80                 return;
       
    81             }
       
    82             chunkStartNanos = repositoryFiles.getTimestamp(path);
       
    83             try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
       
    84                 chunkParser = new ChunkParser(input, c1.getReuse());
       
    85                 long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
       
    86                 long start = validStartTime ? c1.getStartNanos() : segmentStart;
       
    87                 long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE;
       
    88                 while (!isClosed()) {
       
    89                     boolean awaitnewEvent = false;
       
    90                     while (!isClosed() && !chunkParser.isChunkFinished()) {
       
    91                         final StreamConfiguration c2 = configuration;
       
    92                         boolean ordered = c2.getOrdered();
       
    93                         chunkParser.setFlushOperation(flushOperation);
       
    94                         chunkParser.setReuse(c2.getReuse());
       
    95                         chunkParser.setOrdered(ordered);
       
    96                         chunkParser.setFirstNanos(start);
       
    97                         chunkParser.setLastNanos(end);
       
    98                         chunkParser.resetEventCache();
       
    99                         chunkParser.setParserFilter(c2.getFilter());
       
   100                         chunkParser.updateEventParsers();
       
   101                         clearLastDispatch();
       
   102                         if (ordered) {
       
   103                             awaitnewEvent = processOrdered(awaitnewEvent);
       
   104                         } else {
       
   105                             awaitnewEvent = processUnordered(awaitnewEvent);
       
   106                         }
       
   107                         if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > end) {
       
   108                             close();
       
   109                             return;
       
   110                         }
       
   111                     }
       
   112 
       
   113 
       
   114                     if (isClosed()) {
       
   115                         return;
       
   116                     }
       
   117                     long durationNanos = chunkParser.getChunkDuration();
       
   118                     path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
       
   119                     if (path == null) {
       
   120                         return; // stream closed
       
   121                     }
       
   122                     chunkStartNanos = repositoryFiles.getTimestamp(path);
       
   123                     input.setFile(path);
       
   124                     chunkParser = chunkParser.newChunkParser();
       
   125                     // No need filter when we reach new chunk
       
   126                     // start = 0;
       
   127                 }
       
   128             }
       
   129         }
       
   130 
       
   131         private boolean processOrdered(boolean awaitNewEvents) throws IOException {
       
   132             if (sortedList == null) {
       
   133                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
       
   134             }
       
   135             int index = 0;
       
   136             while (true) {
       
   137                 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
       
   138                 if (e == null) {
       
   139                     // wait for new event with next call to
       
   140                     // readStreamingEvent()
       
   141                     awaitNewEvents = true;
       
   142                     break;
       
   143                 }
       
   144                 awaitNewEvents = false;
       
   145                 if (index == sortedList.length) {
       
   146                     sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
       
   147                 }
       
   148                 sortedList[index++] = e;
       
   149             }
       
   150 
       
   151             // no events found
       
   152             if (index == 0 && chunkParser.isChunkFinished()) {
       
   153                 return awaitNewEvents;
       
   154             }
       
   155             // at least 2 events, sort them
       
   156             if (index > 1) {
       
   157                 Arrays.sort(sortedList, 0, index, END_TIME);
       
   158             }
       
   159             for (int i = 0; i < index; i++) {
       
   160                 dispatch(sortedList[i]);
       
   161             }
       
   162             return awaitNewEvents;
       
   163         }
       
   164 
       
   165         private boolean processUnordered(boolean awaitNewEvents) throws IOException {
       
   166             while (true) {
       
   167                 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
       
   168                 if (e == null) {
       
   169                     return true;
       
   170                 } else {
       
   171                     dispatch(e);
       
   172                 }
       
   173             }
       
   174         }
       
   175 
       
   176         @Override
       
   177         public void close() {
       
   178             setClosed(true);
       
   179             repositoryFiles.close();
       
   180         }
       
   181     }
       
   182 
       
   183     private final AbstractEventStream eventStream;
       
   184 
       
   185     public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException {
       
   186         eventStream = new DirectoryStream(acc, p, access, active);
       
   187     }
    58     }
   188 
    59 
   189     @Override
    60     @Override
   190     public void close() {
    61     public void close() {
   191         eventStream.close();
    62         setClosed(true);
   192     }
    63         runCloseActions();
   193 
    64         repositoryFiles.close();
   194     @Override
       
   195     public void onFlush(Runnable action) {
       
   196         Objects.requireNonNull(action);
       
   197         eventStream.onFlush(action);
       
   198     }
    65     }
   199 
    66 
   200     @Override
    67     @Override
   201     public void start() {
    68     public void start() {
   202         start(Utils.timeToNanos(Instant.now()));
    69         start(Utils.timeToNanos(Instant.now()));
   206     public void startAsync() {
    73     public void startAsync() {
   207         startAsync(Utils.timeToNanos(Instant.now()));
    74         startAsync(Utils.timeToNanos(Instant.now()));
   208     }
    75     }
   209 
    76 
   210     @Override
    77     @Override
   211     public void onEvent(Consumer<RecordedEvent> action) {
    78     public void process() throws Exception {
   212         Objects.requireNonNull(action);
    79         StreamConfiguration c = configuration;
   213         eventStream.onEvent(action);
    80         Path path;
       
    81         boolean validStartTime = active || c.getStartTime() != null;
       
    82         if (validStartTime) {
       
    83             path = repositoryFiles.firstPath(c.getStartNanos());
       
    84         } else {
       
    85             path = repositoryFiles.lastPath();
       
    86         }
       
    87         if (path == null) { // closed
       
    88             return;
       
    89         }
       
    90         chunkStartNanos = repositoryFiles.getTimestamp(path);
       
    91         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
       
    92             chunkParser = new ChunkParser(input, c.getReuse());
       
    93             long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
       
    94             long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
       
    95             long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
       
    96             while (!isClosed()) {
       
    97                 boolean awaitnewEvent = false;
       
    98                 while (!isClosed() && !chunkParser.isChunkFinished()) {
       
    99                     c = configuration;
       
   100                     boolean ordered = c.getOrdered();
       
   101                     chunkParser.setFlushOperation(getFlushOperation());
       
   102                     chunkParser.setReuse(c.getReuse());
       
   103                     chunkParser.setOrdered(ordered);
       
   104                     chunkParser.setFilterStart(filtertStart);
       
   105                     chunkParser.setFilterEnd(filterEnd);
       
   106                     chunkParser.resetEventCache();
       
   107                     chunkParser.setParserFilter(c.getFilter());
       
   108                     chunkParser.updateEventParsers();
       
   109                     clearLastDispatch();
       
   110                     if (ordered) {
       
   111                         awaitnewEvent = processOrdered(awaitnewEvent);
       
   112                     } else {
       
   113                         awaitnewEvent = processUnordered(awaitnewEvent);
       
   114                     }
       
   115                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
       
   116                         close();
       
   117                         return;
       
   118                     }
       
   119                 }
       
   120 
       
   121                 if (isClosed()) {
       
   122                     return;
       
   123                 }
       
   124                 long durationNanos = chunkParser.getChunkDuration();
       
   125                 path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
       
   126                 if (path == null) {
       
   127                     return; // stream closed
       
   128                 }
       
   129                 chunkStartNanos = repositoryFiles.getTimestamp(path);
       
   130                 input.setFile(path);
       
   131                 chunkParser = chunkParser.newChunkParser();
       
   132                 // TODO: Optimization. No need filter when we reach new chunk
       
   133                 // Could set start = 0;
       
   134             }
       
   135         }
   214     }
   136     }
   215 
   137 
   216     @Override
   138     private boolean processOrdered(boolean awaitNewEvents) throws IOException {
   217     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
   139         if (sortedList == null) {
   218         Objects.requireNonNull(eventName);
   140             sortedList = new RecordedEvent[100_000];
   219         Objects.requireNonNull(action);
   141         }
   220         eventStream.onEvent(eventName, action);
   142         int index = 0;
       
   143         while (true) {
       
   144             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
       
   145             if (e == null) {
       
   146                 // wait for new event with next call to
       
   147                 // readStreamingEvent()
       
   148                 awaitNewEvents = true;
       
   149                 break;
       
   150             }
       
   151             awaitNewEvents = false;
       
   152             if (index == sortedList.length) {
       
   153                 sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
       
   154             }
       
   155             sortedList[index++] = e;
       
   156         }
       
   157 
       
   158         // no events found
       
   159         if (index == 0 && chunkParser.isChunkFinished()) {
       
   160             return awaitNewEvents;
       
   161         }
       
   162         // at least 2 events, sort them
       
   163         if (index > 1) {
       
   164             Arrays.sort(sortedList, 0, index, END_TIME);
       
   165         }
       
   166         for (int i = 0; i < index; i++) {
       
   167             dispatch(sortedList[i]);
       
   168         }
       
   169         return awaitNewEvents;
   221     }
   170     }
   222 
   171 
   223     @Override
   172     private boolean processUnordered(boolean awaitNewEvents) throws IOException {
   224     public void onClose(Runnable action) {
   173         while (true) {
   225         Objects.requireNonNull(action);
   174             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
   226         eventStream.addCloseAction(action);
   175             if (e == null) {
       
   176                 return true;
       
   177             } else {
       
   178                 dispatch(e);
       
   179             }
       
   180         }
   227     }
   181     }
   228 
       
   229     @Override
       
   230     public boolean remove(Object action) {
       
   231         Objects.requireNonNull(action);
       
   232         return eventStream.remove(action);
       
   233     }
       
   234 
       
   235     @Override
       
   236     public void awaitTermination(Duration timeout) {
       
   237         Objects.requireNonNull(timeout);
       
   238         eventStream.awaitTermination(timeout);
       
   239     }
       
   240 
       
   241     @Override
       
   242     public void awaitTermination() {
       
   243         eventStream.awaitTermination(Duration.ofMillis(0));
       
   244     }
       
   245 
       
   246     @Override
       
   247     public void setReuse(boolean reuse) {
       
   248         eventStream.setReuse(reuse);
       
   249     }
       
   250 
       
   251     @Override
       
   252     public void setOrdered(boolean ordered) {
       
   253         eventStream.setOrdered(ordered);
       
   254     }
       
   255 
       
   256     @Override
       
   257     public void setStartTime(Instant startTime) {
       
   258         eventStream.setStartTime(startTime);
       
   259     }
       
   260 
       
   261     @Override
       
   262     public void setEndTime(Instant endTime) {
       
   263         eventStream.setEndTime(endTime);
       
   264     }
       
   265 
       
   266 
       
   267     public void start(long startNanos) {
       
   268         eventStream.start(startNanos);
       
   269     }
       
   270 
       
   271     public void startAsync(long startNanos) {
       
   272         eventStream.startAsync(startNanos);
       
   273     }
       
   274 
       
   275     @Override
       
   276     public void onError(Consumer<Throwable> action) {
       
   277         // TODO Auto-generated method stub
       
   278 
       
   279     }
       
   280 
       
   281 
       
   282 }
   182 }