src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
changeset 58863 c16ac7a2eba4
child 59226 a0f39cc47387
equal deleted inserted replaced
58861:2c3cc4b01880 58863:c16ac7a2eba4
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.internal.consumer;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.nio.file.Path;
       
    30 import java.security.AccessControlContext;
       
    31 import java.time.Instant;
       
    32 import java.util.Arrays;
       
    33 import java.util.Comparator;
       
    34 import java.util.Objects;
       
    35 
       
    36 import jdk.jfr.consumer.RecordedEvent;
       
    37 import jdk.jfr.internal.JVM;
       
    38 import jdk.jfr.internal.Utils;
       
    39 import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
       
    40 
       
    41 /**
       
    42  * Implementation of an {@code EventStream}} that operates against a directory
       
    43  * with chunk files.
       
    44  *
       
    45  */
       
    46 public final class EventDirectoryStream extends AbstractEventStream {
       
    47 
       
    48     private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
       
    49 
       
    50     private final RepositoryFiles repositoryFiles;
       
    51     private final boolean active;
       
    52     private final FileAccess fileAccess;
       
    53 
       
    54     private ChunkParser currentParser;
       
    55     private long currentChunkStartNanos;
       
    56     private RecordedEvent[] sortedCache;
       
    57     private int threadExclusionLevel = 0;
       
    58 
       
    59     public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
       
    60         super(acc, active);
       
    61         this.fileAccess = Objects.requireNonNull(fileAccess);
       
    62         this.active = active;
       
    63         this.repositoryFiles = new RepositoryFiles(fileAccess, p);
       
    64     }
       
    65 
       
    66     @Override
       
    67     public void close() {
       
    68         setClosed(true);
       
    69         dispatcher().runCloseActions();
       
    70         repositoryFiles.close();
       
    71     }
       
    72 
       
    73     @Override
       
    74     public void start() {
       
    75         start(Utils.timeToNanos(Instant.now()));
       
    76     }
       
    77 
       
    78     @Override
       
    79     public void startAsync() {
       
    80         startAsync(Utils.timeToNanos(Instant.now()));
       
    81     }
       
    82 
       
    83     @Override
       
    84     protected void process() throws IOException {
       
    85         JVM jvm = JVM.getJVM();
       
    86         Thread t = Thread.currentThread();
       
    87         try {
       
    88             if (jvm.isExcluded(t)) {
       
    89                 threadExclusionLevel++;
       
    90             } else {
       
    91                 jvm.exclude(t);
       
    92             }
       
    93             processRecursionSafe();
       
    94         } finally {
       
    95             if (threadExclusionLevel > 0) {
       
    96                 threadExclusionLevel--;
       
    97             } else {
       
    98                 jvm.include(t);
       
    99             }
       
   100         }
       
   101     }
       
   102 
       
   103     protected void processRecursionSafe() throws IOException {
       
   104         Dispatcher disp = dispatcher();
       
   105 
       
   106         Path path;
       
   107         boolean validStartTime = active || disp.startTime != null;
       
   108         if (validStartTime) {
       
   109             path = repositoryFiles.firstPath(disp.startNanos);
       
   110         } else {
       
   111             path = repositoryFiles.lastPath();
       
   112         }
       
   113         if (path == null) { // closed
       
   114             return;
       
   115         }
       
   116         currentChunkStartNanos = repositoryFiles.getTimestamp(path);
       
   117         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
       
   118             currentParser = new ChunkParser(input, disp.parserConfiguration);
       
   119             long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
       
   120             long filterStart = validStartTime ? disp.startNanos : segmentStart;
       
   121             long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
       
   122 
       
   123             while (!isClosed()) {
       
   124                 boolean awaitnewEvent = false;
       
   125                 while (!isClosed() && !currentParser.isChunkFinished()) {
       
   126                     disp = dispatcher();
       
   127                     ParserConfiguration pc = disp.parserConfiguration;
       
   128                     pc.filterStart = filterStart;
       
   129                     pc.filterEnd = filterEnd;
       
   130                     currentParser.updateConfiguration(pc, true);
       
   131                     currentParser.setFlushOperation(getFlushOperation());
       
   132                     if (pc.isOrdered()) {
       
   133                         awaitnewEvent = processOrdered(disp, awaitnewEvent);
       
   134                     } else {
       
   135                         awaitnewEvent = processUnordered(disp, awaitnewEvent);
       
   136                     }
       
   137                     if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
       
   138                         close();
       
   139                         return;
       
   140                     }
       
   141                 }
       
   142 
       
   143                 if (isClosed()) {
       
   144                     return;
       
   145                 }
       
   146                 long durationNanos = currentParser.getChunkDuration();
       
   147                 if (durationNanos == 0) {
       
   148                     // Avoid reading the same chunk again and again if
       
   149                     // duration is 0 ns
       
   150                     durationNanos++;
       
   151                 }
       
   152                 path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
       
   153                 if (path == null) {
       
   154                     return; // stream closed
       
   155                 }
       
   156                 currentChunkStartNanos = repositoryFiles.getTimestamp(path);
       
   157                 input.setFile(path);
       
   158                 currentParser = currentParser.newChunkParser();
       
   159                 // TODO: Optimization. No need filter when we reach new chunk
       
   160                 // Could set start = 0;
       
   161             }
       
   162         }
       
   163     }
       
   164 
       
   165     private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
       
   166         if (sortedCache == null) {
       
   167             sortedCache = new RecordedEvent[100_000];
       
   168         }
       
   169         int index = 0;
       
   170         while (true) {
       
   171             RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
       
   172             if (e == null) {
       
   173                 // wait for new event with next call to
       
   174                 // readStreamingEvent()
       
   175                 awaitNewEvents = true;
       
   176                 break;
       
   177             }
       
   178             awaitNewEvents = false;
       
   179             if (index == sortedCache.length) {
       
   180                 sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
       
   181             }
       
   182             sortedCache[index++] = e;
       
   183         }
       
   184 
       
   185         // no events found
       
   186         if (index == 0 && currentParser.isChunkFinished()) {
       
   187             return awaitNewEvents;
       
   188         }
       
   189         // at least 2 events, sort them
       
   190         if (index > 1) {
       
   191             Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR);
       
   192         }
       
   193         for (int i = 0; i < index; i++) {
       
   194             c.dispatch(sortedCache[i]);
       
   195         }
       
   196         return awaitNewEvents;
       
   197     }
       
   198 
       
   199     private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
       
   200         while (true) {
       
   201             RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
       
   202             if (e == null) {
       
   203                 return true;
       
   204             } else {
       
   205                 c.dispatch(e);
       
   206             }
       
   207         }
       
   208     }
       
   209 }