src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
branchJEP-349-branch
changeset 57985 be121cbf3284
parent 57971 aa7b1ea52413
child 58020 f082177c5023
equal deleted inserted replaced
57984:269bbe414580 57985:be121cbf3284
    26 package jdk.jfr.consumer;
    26 package jdk.jfr.consumer;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.file.Path;
    29 import java.nio.file.Path;
    30 import java.security.AccessControlContext;
    30 import java.security.AccessControlContext;
    31 import java.security.AccessController;
       
    32 import java.time.Duration;
       
    33 import java.time.Instant;
       
    34 import java.util.Arrays;
    31 import java.util.Arrays;
    35 import java.util.Objects;
    32 import java.util.Objects;
    36 import java.util.function.Consumer;
       
    37 
    33 
    38 import jdk.jfr.internal.consumer.FileAccess;
    34 import jdk.jfr.internal.consumer.FileAccess;
    39 import jdk.jfr.internal.consumer.RecordingInput;
    35 import jdk.jfr.internal.consumer.RecordingInput;
    40 
    36 
    41 /**
    37 /**
    42  * Implementation of an event stream that operates against a recording file.
    38  * Implementation of an event stream that operates against a recording file.
    43  *
    39  *
    44  */
    40  */
    45 final class EventFileStream implements EventStream {
    41 final class EventFileStream extends AbstractEventStream {
       
    42     private final RecordingInput input;
       
    43     private ChunkParser chunkParser;
       
    44     private RecordedEvent[] sortedList;
    46 
    45 
    47     private final static class FileStream extends AbstractEventStream {
    46     public EventFileStream(AccessControlContext acc, Path path) throws IOException {
    48         private static final int DEFAULT_ARRAY_SIZE = 100_000;
    47         super(acc, false);
    49 
       
    50         private final RecordingInput input;
       
    51 
       
    52         private ChunkParser chunkParser;
       
    53         private RecordedEvent[] sortedList;
       
    54 
       
    55         public FileStream(AccessControlContext acc, Path path) throws IOException {
       
    56             super(acc, false);
       
    57             this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
       
    58 ;        }
       
    59 
       
    60         @Override
       
    61         public void process() throws IOException {
       
    62             final StreamConfiguration c1 = configuration;
       
    63             long start = 0;
       
    64             long end = Long.MAX_VALUE;
       
    65             if (c1.getStartTime() != null) {
       
    66                 start = c1.getStartNanos();
       
    67             }
       
    68             if (c1.getEndTime() != null) {
       
    69                 end = c1.getEndNanos();
       
    70             }
       
    71 
       
    72             chunkParser = new ChunkParser(input, c1.getReuse());
       
    73             while (!isClosed()) {
       
    74                 if (chunkParser.getStartNanos() > end) {
       
    75                     close();
       
    76                     return;
       
    77                 }
       
    78                 StreamConfiguration c2 = configuration;
       
    79                 boolean ordered = c2.getOrdered();
       
    80                 chunkParser.setFlushOperation(flushOperation);
       
    81                 chunkParser.setFirstNanos(start);
       
    82                 chunkParser.setLastNanos(end);
       
    83                 chunkParser.setReuse(c2.getReuse());
       
    84                 chunkParser.setOrdered(ordered);
       
    85                 chunkParser.resetEventCache();
       
    86                 chunkParser.setParserFilter(c2.getFiler());
       
    87                 chunkParser.updateEventParsers();
       
    88                 clearLastDispatch();
       
    89                 if (ordered) {
       
    90                     processOrdered();
       
    91                 } else {
       
    92                     processUnordered();
       
    93                 }
       
    94                 if (chunkParser.isLastChunk()) {
       
    95                     return;
       
    96                 }
       
    97                 chunkParser = chunkParser.nextChunkParser();
       
    98             }
       
    99         }
       
   100 
       
   101         private void processOrdered() throws IOException {
       
   102             if (sortedList == null) {
       
   103                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
       
   104             }
       
   105             RecordedEvent event;
       
   106             int index = 0;
       
   107             while (true) {
       
   108                 event = chunkParser.readEvent();
       
   109                 if (event == null) {
       
   110                     Arrays.sort(sortedList, 0, index, END_TIME);
       
   111                     for (int i = 0; i < index; i++) {
       
   112                         dispatch(sortedList[i]);
       
   113                     }
       
   114                     return;
       
   115                 }
       
   116                 if (index == sortedList.length) {
       
   117                     RecordedEvent[] tmp = sortedList;
       
   118                     sortedList = new RecordedEvent[2 * tmp.length];
       
   119                     System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
       
   120                 }
       
   121                 sortedList[index++] = event;
       
   122             }
       
   123         }
       
   124 
       
   125         private void processUnordered() throws IOException {
       
   126             RecordedEvent event;
       
   127             while (!isClosed()) {
       
   128                 event = chunkParser.readEvent();
       
   129                 if (event == null) {
       
   130                     return;
       
   131                 }
       
   132                 dispatch(event);
       
   133             }
       
   134         }
       
   135 
       
   136         @Override
       
   137         public void close() {
       
   138             setClosed(true);;
       
   139             runCloseActions();
       
   140             try {
       
   141                 input.close();
       
   142             } catch (IOException e) {
       
   143                 // ignore
       
   144             }
       
   145         }
       
   146     }
       
   147 
       
   148     private final FileStream eventStream;
       
   149 
       
   150     public EventFileStream(Path path, Instant from, Instant to) throws IOException {
       
   151         Objects.requireNonNull(path);
    48         Objects.requireNonNull(path);
   152         eventStream = new FileStream(AccessController.getContext(), path);
    49         this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
   153     }
    50     }
   154 
    51 
   155     @Override
    52     @Override
   156     public void onEvent(Consumer<RecordedEvent> action) {
    53     public void start() {
   157         Objects.requireNonNull(action);
    54         start(0);
   158         eventStream.onEvent(action);
       
   159     }
    55     }
   160 
    56 
   161     @Override
    57     @Override
   162     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
    58     public void startAsync() {
   163         Objects.requireNonNull(eventName);
    59         startAsync(0);
   164         Objects.requireNonNull(action);
       
   165         eventStream.onEvent(eventName, action);
       
   166     }
       
   167 
       
   168     @Override
       
   169     public void onFlush(Runnable action) {
       
   170         Objects.requireNonNull(action);
       
   171         eventStream.onFlush(action);
       
   172     }
       
   173 
       
   174     @Override
       
   175     public void onClose(Runnable action) {
       
   176         Objects.requireNonNull(action);
       
   177         eventStream.addCloseAction(action);
       
   178     }
    60     }
   179 
    61 
   180     @Override
    62     @Override
   181     public void close() {
    63     public void close() {
   182         eventStream.close();
    64         setClosed(true);
       
    65         runCloseActions();
       
    66         try {
       
    67             input.close();
       
    68         } catch (IOException e) {
       
    69             // ignore
       
    70         }
   183     }
    71     }
   184 
    72 
   185     @Override
    73     @Override
   186     public boolean remove(Object action) {
    74     public void process() throws IOException {
   187         Objects.requireNonNull(action);
    75         StreamConfiguration c = configuration;
   188         return eventStream.remove(action);
    76         long start = 0;
       
    77         long end = Long.MAX_VALUE;
       
    78         if (c.getStartTime() != null) {
       
    79             start = c.getStartNanos();
       
    80         }
       
    81         if (c.getEndTime() != null) {
       
    82             end = c.getEndNanos();
       
    83         }
       
    84 
       
    85         chunkParser = new ChunkParser(input, c.getReuse());
       
    86         while (!isClosed()) {
       
    87             if (chunkParser.getStartNanos() > end) {
       
    88                 close();
       
    89                 return;
       
    90             }
       
    91             c = configuration;
       
    92             boolean ordered = c.getOrdered();
       
    93             chunkParser.setFlushOperation(getFlushOperation());
       
    94             chunkParser.setFilterStart(start);
       
    95             chunkParser.setFilterEnd(end);
       
    96             chunkParser.setReuse(c.getReuse());
       
    97             chunkParser.setOrdered(ordered);
       
    98             chunkParser.resetEventCache();
       
    99             chunkParser.setParserFilter(c.getFiler());
       
   100             chunkParser.updateEventParsers();
       
   101             clearLastDispatch();
       
   102             if (ordered) {
       
   103                 processOrdered();
       
   104             } else {
       
   105                 processUnordered();
       
   106             }
       
   107             if (chunkParser.isLastChunk()) {
       
   108                 return;
       
   109             }
       
   110             chunkParser = chunkParser.nextChunkParser();
       
   111         }
   189     }
   112     }
   190 
   113 
   191     @Override
   114     private void processOrdered() throws IOException {
   192     public void start() {
   115         if (sortedList == null) {
   193         eventStream.start(0);
   116             sortedList = new RecordedEvent[10_000];
       
   117         }
       
   118         RecordedEvent event;
       
   119         int index = 0;
       
   120         while (true) {
       
   121             event = chunkParser.readEvent();
       
   122             if (event == null) {
       
   123                 Arrays.sort(sortedList, 0, index, END_TIME);
       
   124                 for (int i = 0; i < index; i++) {
       
   125                     dispatch(sortedList[i]);
       
   126                 }
       
   127                 return;
       
   128             }
       
   129             if (index == sortedList.length) {
       
   130                 RecordedEvent[] tmp = sortedList;
       
   131                 sortedList = new RecordedEvent[2 * tmp.length];
       
   132                 System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
       
   133             }
       
   134             sortedList[index++] = event;
       
   135         }
   194     }
   136     }
   195 
   137 
   196     @Override
   138     private void processUnordered() throws IOException {
   197     public void setReuse(boolean reuse) {
   139         while (!isClosed()) {
   198         eventStream.setReuse(reuse);
   140             RecordedEvent event = chunkParser.readEvent();
   199     }
   141             if (event == null) {
   200 
   142                 return;
   201     @Override
   143             }
   202     public void startAsync() {
   144             dispatch(event);
   203         eventStream.startAsync(0);
   145         }
   204     }
       
   205 
       
   206     @Override
       
   207     public void awaitTermination(Duration timeout) {
       
   208         Objects.requireNonNull(timeout);
       
   209         eventStream.awaitTermination(timeout);
       
   210     }
       
   211 
       
   212     @Override
       
   213     public void awaitTermination() {
       
   214         eventStream.awaitTermination();
       
   215     }
       
   216 
       
   217     @Override
       
   218     public void setOrdered(boolean ordered) {
       
   219         eventStream.setOrdered(ordered);
       
   220     }
       
   221 
       
   222     @Override
       
   223     public void setStartTime(Instant startTime) {
       
   224         eventStream.setStartTime(startTime);
       
   225     }
       
   226 
       
   227     @Override
       
   228     public void setEndTime(Instant endTime) {
       
   229         eventStream.setEndTime(endTime);
       
   230     }
       
   231 
       
   232     @Override
       
   233     public void onError(Consumer<Throwable> action) {
       
   234         // TODO Auto-generated method stub
       
   235 
       
   236     }
   146     }
   237 }
   147 }