src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
author egahlin
Thu, 15 Aug 2019 02:55:30 +0200
branchJEP-349-branch
changeset 57753 4883a96b6d37
parent 57690 9316d02dd4a5
child 57861 86022e34ba63
permissions -rw-r--r--
Fix type in comment

/*
 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.jfr.consumer;

import java.io.IOException;
import java.lang.invoke.VarHandle;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

import jdk.jfr.EventType;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
import jdk.jfr.internal.LongMap;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.InternalEventFilter;

/*
 * Purpose of this class is to simplify the implementation of
 * an event stream. In particular, it handles:
 *
 * - configuration storage
 * - atomic updates to a configuration
 * - dispatch mechanism
 * - error handling
 * - security
 *
 */
abstract class AbstractEventStream implements Runnable {

    public static final class StreamConfiguration {
        private static final Runnable[] NO_ACTIONS = new Runnable[0];

        private Runnable[] flushActions = NO_ACTIONS;
        private Runnable[] closeActions = NO_ACTIONS;
        private EventDispatcher[] dispatchers = NO_DISPATCHERS;
        private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
        private boolean closed = false;
        private boolean reuse = true;
        private boolean ordered = true;
        private Instant startTime = null;
        private Instant endTime = null;
        private boolean started = false;
        private long startNanos = 0;
        private long endNanos = Long.MAX_VALUE;
        private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
        private boolean changed = false;

        public StreamConfiguration(StreamConfiguration configuration) {
            this.flushActions = configuration.flushActions;
            this.closeActions = configuration.closeActions;
            this.dispatchers = configuration.dispatchers;
            this.eventFilter = configuration.eventFilter;
            this.closed = configuration.closed;
            this.reuse = configuration.reuse;
            this.ordered = configuration.ordered;
            this.startTime = configuration.startTime;
            this.endTime = configuration.endTime;
            this.started = configuration.started;
            this.startNanos = configuration.startNanos;
            this.endNanos = configuration.endNanos;
            this.dispatcherLookup = configuration.dispatcherLookup;
        }

        public StreamConfiguration() {
        }

        final public StreamConfiguration remove(Object action) {
            flushActions = remove(flushActions, action);
            closeActions = remove(closeActions, action);
            dispatchers = removeDispatch(dispatchers, action);
            return this;
        }

        final public StreamConfiguration addDispatcher(EventDispatcher e) {
            dispatchers = add(dispatchers, e);
            eventFilter = buildFilter(dispatchers);
            dispatcherLookup = new LongMap<>();
            return this;
        }

        final public StreamConfiguration addFlushAction(Runnable action) {
            flushActions = add(flushActions, action);
            return this;
        }

        final public StreamConfiguration addCloseAction(Runnable action) {
            closeActions = add(closeActions, action);
            return this;
        }

        final public StreamConfiguration setClosed(boolean closed) {
            this.closed = closed;
            changed = true;
            return this;
        }

        final public boolean isClosed() {
            return closed;
        }

        final public Runnable[] getCloseActions() {
            return closeActions;
        }

        final public Runnable[] getFlushActions() {
            return flushActions;
        }

        private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
            List<EventDispatcher> list = new ArrayList<>(array.length);
            boolean modified = false;
            for (int i = 0; i < array.length; i++) {
                if (array[i].action != action) {
                    list.add(array[i]);
                } else {
                    modified = true;
                }
            }
            EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
            if (modified) {
                eventFilter = buildFilter(result);
                dispatcherLookup = new LongMap<>();
                changed = true;
            }
            return result;
        }

        private <T> T[] remove(T[] array, Object action) {
            List<T> list = new ArrayList<>(array.length);
            for (int i = 0; i < array.length; i++) {
                if (array[i] != action) {
                    list.add(array[i]);
                } else {
                    changed = true;
                }
            }
            return list.toArray(array);
        }

        private <T> T[] add(T[] array, T object) {
            List<T> list = new ArrayList<>(Arrays.asList(array));
            list.add(object);
            changed = true;
            return list.toArray(array);
        }

        private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
            InternalEventFilter ef = new InternalEventFilter();
            for (EventDispatcher ed : dispatchers) {
                String name = ed.eventName;
                if (name == null) {
                    return InternalEventFilter.ACCEPT_ALL;
                }
                ef.setThreshold(name, 0);
            }
            return ef.threadSafe();
        }

        final public StreamConfiguration setReuse(boolean reuse) {
            this.reuse = reuse;
            changed = true;
            return this;
        }

        final public StreamConfiguration setOrdered(boolean ordered) {
            this.ordered = ordered;
            changed = true;
            return this;
        }
        public StreamConfiguration setEndTime(Instant endTime) {
            this.endTime = endTime;
            this.endNanos = Utils.timeToNanos(endTime);
            changed = true;
            return this;
        }

        final public StreamConfiguration setStartTime(Instant startTime) {
            this.startTime = startTime;
            this.startNanos = Utils.timeToNanos(startTime);
            changed = true;
            return this;
        }

        final public Instant getStartTime() {
            return startTime;
        }

        public Object getEndTime() {
            return endTime;
        }

        final public boolean isStarted() {
            return started;
        }

        final public StreamConfiguration setStartNanos(long startNanos) {
            this.startNanos = startNanos;
            changed = true;
            return this;
        }

        final public void setStarted(boolean started) {
            this.started = started;
            changed = true;
        }

        final public boolean hasChanged() {
            return changed;
        }

        final public boolean getReuse() {
            return reuse;
        }

        final public boolean getOrdered() {
            return ordered;
        }

        final public InternalEventFilter getFiler() {
            return eventFilter;
        }

        final public long getStartNanos() {
            return startNanos;
        }

        final public long getEndNanos() {
            return endNanos;
        }

        final public InternalEventFilter getFilter() {
            return eventFilter;
        }

        final public String toString() {
            StringBuilder sb = new StringBuilder();
            for (Runnable flush : flushActions) {
                sb.append("Flush Action: ").append(flush).append("\n");
            }
            for (Runnable close : closeActions) {
                sb.append("Close Action: " + close + "\n");
            }
            for (EventDispatcher dispatcher : dispatchers) {
                sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
            }
            sb.append("Closed: ").append(closed).append("\n");
            sb.append("Reuse: ").append(reuse).append("\n");
            sb.append("Ordered: ").append(ordered).append("\n");
            sb.append("Started: ").append(started).append("\n");
            sb.append("Start Time: ").append(startTime).append("\n");
            sb.append("Start Nanos: ").append(startNanos).append("\n");
            sb.append("End Time: ").append(endTime).append("\n");
            sb.append("End Nanos: ").append(endNanos).append("\n");
            return sb.toString();
        }

        private EventDispatcher[] getDispatchers() {
            return dispatchers;
        }




    }

    final static class EventDispatcher {
        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];

        final private String eventName;
        final private Consumer<RecordedEvent> action;

        public EventDispatcher(Consumer<RecordedEvent> action) {
            this(null, action);
        }

        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
            this.eventName = eventName;
            this.action = action;
        }

        public void offer(RecordedEvent event) {
            action.accept(event);
        }

        public boolean accepts(EventType eventType) {
            return (eventName == null || eventType.getName().equals(eventName));
        }
    }

    public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);

    private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
    private final AccessControlContext accessControlContext;
    private final Thread thread;
    private final boolean active;
    // Updated by updateConfiguration()
    protected StreamConfiguration configuration = new StreamConfiguration();

    // Cache the last event type and dispatch.
    private EventType lastEventType;
    private EventDispatcher[] lastEventDispatch;

    public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
        this.accessControlContext = acc;
        this.active = active;
        // Create thread object in constructor to ensure caller has
        // permission before constructing object
        thread = new Thread(this);
        thread.setDaemon(true);
    }

    public final void run() {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                execute();
                return null;
            }
        }, accessControlContext);

    }

    private void execute() {
        JVM.getJVM().exclude(Thread.currentThread());
        try {
            process();
        } catch (IOException e) {
            if (!isClosed()) {
                logException(e);
            }
        } catch (Exception e) {
            logException(e);
        } finally {
            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
        }
    }

    private void logException(Exception e) {
        // FIXME: e.printStackTrace(); for debugging purposes,
        // remove before before integration
        e.printStackTrace();
        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
    }

    public abstract void process() throws IOException;

    protected final void clearLastDispatch() {
        lastEventDispatch = null;
        lastEventType = null;
    }

    protected final void dispatch(RecordedEvent event) {
        EventType type = event.getEventType();
        EventDispatcher[] ret = null;
        if (type == lastEventType) {
            ret = lastEventDispatch;
        } else {
            ret = configuration.dispatcherLookup.get(type.getId());
            if (ret == null) {
                List<EventDispatcher> list = new ArrayList<>();
                for (EventDispatcher e : configuration.getDispatchers()) {
                    if (e.accepts(type)) {
                        list.add(e);
                    }
                }
                ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
                configuration.dispatcherLookup.put(type.getId(), ret);
            }
            lastEventDispatch = ret;
        }
        for (int i = 0; i < ret.length; i++) {
            try {
                ret[i].offer(event);
            } catch (Exception e) {
                logException(e);
            }
        }
    }

    public final void runCloseActions() {
        Runnable[] cas = configuration.getCloseActions();
        for (int i = 0; i < cas.length; i++) {
            try {
                cas[i].run();
            } catch (Exception e) {
                logException(e);
            }
        }
    }

    public final void runFlushActions() {
        Runnable[] fas = configuration.getFlushActions();
        for (int i = 0; i < fas.length; i++) {
            try {
                fas[i].run();
            } catch (Exception e) {
                logException(e);
            }
        }
    }

    // Purpose of synchronizing the following methods is
    // to serialize changes to the configuration, so only one
    // thread at a time can change the configuration.
    //
    // The purpose is not to guard the configuration field. A new
    // configuration is published using updateConfiguration
    //
    public final synchronized boolean remove(Object action) {
        return updateConfiguration(new StreamConfiguration(configuration).remove(action));
    }

    public final synchronized void onEvent(Consumer<RecordedEvent> action) {
        add(new EventDispatcher(action));
    }

    public final synchronized void onEvent(String eventName, Consumer<RecordedEvent> action) {
        add(new EventDispatcher(eventName, action));
    }

    private final synchronized void add(EventDispatcher e) {
        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
    }

    public final synchronized void onFlush(Runnable action) {
        updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
    }

    public final synchronized void addCloseAction(Runnable action) {
        updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
    }

    public final synchronized void setClosed(boolean closed) {
        updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
    }

    public final synchronized void setReuse(boolean reuse) {
        updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
    }

    public final synchronized void setOrdered(boolean ordered) {
        updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
    }

    public final synchronized void setStartNanos(long startNanos) {
        updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos));
    }

    public final synchronized void setStartTime(Instant startTime) {
        Objects.nonNull(startTime);
        if (configuration.isStarted()) {
            throw new IllegalStateException("Stream is already started");
        }
        if (startTime.isBefore(Instant.EPOCH)) {
            startTime = Instant.EPOCH;
        }
        updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
    }

    public final void setEndTime(Instant endTime) {
    if (configuration.isStarted()) {
        throw new IllegalStateException("Stream is already started");
    }
    updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
}


    protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
        // Changes to the configuration must be serialized, so make
        // sure that we have the monitor
        Thread.holdsLock(this);
        if (newConfiguration.hasChanged()) {
            // Publish objects indirectly held by new configuration object
            VarHandle.releaseFence();
            configuration = newConfiguration;
            // Publish the field reference. Making the field volatile
            // would be an alternative, but it is repeatedly read.
            VarHandle.releaseFence();
            return true;
        }
        return false;
    }

    public final boolean isClosed() {
        return configuration.isClosed();
    }

    public final void startAsync(long startNanos) {
        startInternal(startNanos);
        thread.start();
    }

    public final void start(long startNanos) {
        startInternal(startNanos);
        run();
    }

    private void startInternal(long startNanos) {
        synchronized (this) {
            if (configuration.isStarted()) {
                throw new IllegalStateException("Event stream can only be started once");
            }
            StreamConfiguration c = new StreamConfiguration(configuration);
            if (active) {
                c.setStartNanos(startNanos);
            }
            c.setStarted(true);
            updateConfiguration(c);
        }
    }

    public final void awaitTermination(Duration timeout) {
        Objects.requireNonNull(timeout);
        if (thread != Thread.currentThread()) {
            try {
                thread.join(timeout.toMillis());
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    public final void awaitTermination() {
        awaitTermination(Duration.ofMillis(0));
    }

    abstract public void close();

}