src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java
author mgronlun
Wed, 30 Oct 2019 19:43:52 +0100
changeset 58863 c16ac7a2eba4
child 59226 a0f39cc47387
permissions -rw-r--r--
8226511: Implement JFR Event Streaming Reviewed-by: egahlin, mseledtsov, mgronlun Contributed-by: erik.gahlin@oracle.com, mikhailo.seledtsov@oracle.com, markus.gronlund@oracle.com

/*
 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.jfr.internal.consumer;

import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
import java.time.Instant;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;

import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;

/**
 * Implementation of an {@code EventStream}} that operates against a directory
 * with chunk files.
 *
 */
public final class EventDirectoryStream extends AbstractEventStream {

    private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();

    private final RepositoryFiles repositoryFiles;
    private final boolean active;
    private final FileAccess fileAccess;

    private ChunkParser currentParser;
    private long currentChunkStartNanos;
    private RecordedEvent[] sortedCache;
    private int threadExclusionLevel = 0;

    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
        super(acc, active);
        this.fileAccess = Objects.requireNonNull(fileAccess);
        this.active = active;
        this.repositoryFiles = new RepositoryFiles(fileAccess, p);
    }

    @Override
    public void close() {
        setClosed(true);
        dispatcher().runCloseActions();
        repositoryFiles.close();
    }

    @Override
    public void start() {
        start(Utils.timeToNanos(Instant.now()));
    }

    @Override
    public void startAsync() {
        startAsync(Utils.timeToNanos(Instant.now()));
    }

    @Override
    protected void process() throws IOException {
        JVM jvm = JVM.getJVM();
        Thread t = Thread.currentThread();
        try {
            if (jvm.isExcluded(t)) {
                threadExclusionLevel++;
            } else {
                jvm.exclude(t);
            }
            processRecursionSafe();
        } finally {
            if (threadExclusionLevel > 0) {
                threadExclusionLevel--;
            } else {
                jvm.include(t);
            }
        }
    }

    protected void processRecursionSafe() throws IOException {
        Dispatcher disp = dispatcher();

        Path path;
        boolean validStartTime = active || disp.startTime != null;
        if (validStartTime) {
            path = repositoryFiles.firstPath(disp.startNanos);
        } else {
            path = repositoryFiles.lastPath();
        }
        if (path == null) { // closed
            return;
        }
        currentChunkStartNanos = repositoryFiles.getTimestamp(path);
        try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
            currentParser = new ChunkParser(input, disp.parserConfiguration);
            long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
            long filterStart = validStartTime ? disp.startNanos : segmentStart;
            long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;

            while (!isClosed()) {
                boolean awaitnewEvent = false;
                while (!isClosed() && !currentParser.isChunkFinished()) {
                    disp = dispatcher();
                    ParserConfiguration pc = disp.parserConfiguration;
                    pc.filterStart = filterStart;
                    pc.filterEnd = filterEnd;
                    currentParser.updateConfiguration(pc, true);
                    currentParser.setFlushOperation(getFlushOperation());
                    if (pc.isOrdered()) {
                        awaitnewEvent = processOrdered(disp, awaitnewEvent);
                    } else {
                        awaitnewEvent = processUnordered(disp, awaitnewEvent);
                    }
                    if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
                        close();
                        return;
                    }
                }

                if (isClosed()) {
                    return;
                }
                long durationNanos = currentParser.getChunkDuration();
                if (durationNanos == 0) {
                    // Avoid reading the same chunk again and again if
                    // duration is 0 ns
                    durationNanos++;
                }
                path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
                if (path == null) {
                    return; // stream closed
                }
                currentChunkStartNanos = repositoryFiles.getTimestamp(path);
                input.setFile(path);
                currentParser = currentParser.newChunkParser();
                // TODO: Optimization. No need filter when we reach new chunk
                // Could set start = 0;
            }
        }
    }

    private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
        if (sortedCache == null) {
            sortedCache = new RecordedEvent[100_000];
        }
        int index = 0;
        while (true) {
            RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
            if (e == null) {
                // wait for new event with next call to
                // readStreamingEvent()
                awaitNewEvents = true;
                break;
            }
            awaitNewEvents = false;
            if (index == sortedCache.length) {
                sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
            }
            sortedCache[index++] = e;
        }

        // no events found
        if (index == 0 && currentParser.isChunkFinished()) {
            return awaitNewEvents;
        }
        // at least 2 events, sort them
        if (index > 1) {
            Arrays.sort(sortedCache, 0, index, EVENT_COMPARATOR);
        }
        for (int i = 0; i < index; i++) {
            c.dispatch(sortedCache[i]);
        }
        return awaitNewEvents;
    }

    private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
        while (true) {
            RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
            if (e == null) {
                return true;
            } else {
                c.dispatch(e);
            }
        }
    }
}