src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
author egahlin
Thu, 05 Sep 2019 16:46:50 +0200
branchJEP-349-branch
changeset 58020 f082177c5023
parent 57985 be121cbf3284
child 58129 7b751fe181a5
permissions -rw-r--r--
Improved handling of Thread.interrupt() + cleanup

/*
 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.jfr.consumer;

import java.io.IOException;
import java.lang.invoke.VarHandle;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import jdk.jfr.EventType;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
import jdk.jfr.internal.SecuritySupport;

/*
 * Purpose of this class is to simplify the implementation of
 * an event stream. In particular, it handles:
 *
 * - configuration storage
 * - atomic updates to a configuration
 * - dispatch mechanism
 * - error handling
 * - security
 *
 */
abstract class AbstractEventStream implements EventStream {

    final static class EventDispatcher {
        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
        final String eventName;
        final Consumer<RecordedEvent> action;

        public EventDispatcher(Consumer<RecordedEvent> action) {
            this(null, action);
        }

        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
            this.eventName = eventName;
            this.action = action;
        }

        public void offer(RecordedEvent event) {
            action.accept(event);
        }

        public boolean accepts(EventType eventType) {
            return (eventName == null || eventType.getName().equals(eventName));
        }
    }

    final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
    private final static AtomicLong counter = new AtomicLong(1);
    private volatile Thread thread;
    private final Object terminated = new Object();
    private final boolean active;
    private final Runnable flushOperation = () -> runFlushActions();
    private final AccessControlContext accessControllerContext;
    private final Object configurationLock = new Object();

    // Modified by updateConfiguration()
    protected volatile StreamConfiguration configuration = new StreamConfiguration();

    public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
        this.accessControllerContext = Objects.requireNonNull(acc);
        this.active = active;
    }

    @Override
    abstract public void start();

    @Override
    abstract public void startAsync();

    @Override
    abstract public void close();

    // Purpose of synchronizing the following methods is
    // to serialize changes to the configuration so only one
    // thread at a time can change the configuration.
    //
    // The purpose is not to guard the configuration field. A new
    // configuration is published using updateConfiguration
    //
    @Override
    public final void setOrdered(boolean ordered) {
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
        }
    }

    @Override
    public final void setReuse(boolean reuse) {
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
        }
    }

    @Override
    public final void setStartTime(Instant startTime) {
        Objects.nonNull(startTime);
        synchronized (configurationLock) {
            if (configuration.isStarted()) {
                throw new IllegalStateException("Stream is already started");
            }
            if (startTime.isBefore(Instant.EPOCH)) {
                startTime = Instant.EPOCH;
            }
            updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
        }
    }

    @Override
    public final void setEndTime(Instant endTime) {
        Objects.requireNonNull(endTime);
        synchronized (configurationLock) {
            if (configuration.isStarted()) {
                throw new IllegalStateException("Stream is already started");
            }
            updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
        }
    }

    @Override
    public final void onEvent(Consumer<RecordedEvent> action) {
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            add(new EventDispatcher(action));
        }
    }

    @Override
    public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
        Objects.requireNonNull(eventName);
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            add(new EventDispatcher(eventName, action));
        }
    }

    @Override
    public final void onFlush(Runnable action) {
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
        }
    }

    @Override
    public final void onClose(Runnable action) {
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
        }
    }

    @Override
    public final void onError(Consumer<Throwable> action) {
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
        }
    }

    @Override
    public final boolean remove(Object action) {
        Objects.requireNonNull(action);
        synchronized (configurationLock) {
            return updateConfiguration(new StreamConfiguration(configuration).remove(action));
        }
    }

    @Override
    public final void awaitTermination() throws InterruptedException {
        awaitTermination(Duration.ofMillis(0));
    }

    @Override
    public final void awaitTermination(Duration timeout) throws InterruptedException {
        Objects.requireNonNull(timeout);
        if (timeout.isNegative()) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        long base = System.currentTimeMillis();
        long now = 0;

        long millis;
        try {
            millis = Math.multiplyExact(timeout.getSeconds(), 1000);
        } catch (ArithmeticException a) {
            millis = Long.MAX_VALUE;
        }
        int nanos = timeout.toNanosPart();
        if (nanos == 0 && millis == 0) {
            synchronized (terminated) {
                while (!isClosed()) {
                    terminated.wait(0);
                }
            }
        } else {
            while (!isClosed()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                synchronized (terminated) {
                    terminated.wait(delay, nanos);
                }
                now = System.currentTimeMillis() - base;
            }
        }
    }

    protected abstract void process() throws Exception;

    protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
        EventType type = event.getEventType();
        EventDispatcher[] dispatchers = null;
        if (type == c.cacheEventType) {
            dispatchers = c.cacheDispatchers;
        } else {
            dispatchers = c.dispatcherLookup.get(type.getId());
            if (dispatchers == null) {
                List<EventDispatcher> list = new ArrayList<>();
                for (EventDispatcher e : c.getDispatchers()) {
                    if (e.accepts(type)) {
                        list.add(e);
                    }
                }
                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
                c.dispatcherLookup.put(type.getId(), dispatchers);
            }
            c.cacheDispatchers = dispatchers;
        }
        for (int i = 0; i < dispatchers.length; i++) {
            try {
                dispatchers[i].offer(event);
            } catch (Exception e) {
                handleError(e);
            }
        }
    }

    protected final void runCloseActions() {
        Runnable[] closeActions = configuration.getCloseActions();
        for (int i = 0; i < closeActions.length; i++) {
            try {
                closeActions[i].run();
            } catch (Exception e) {
                handleError(e);
            }
        }
    }

    protected final void setClosed(boolean closed) {
        synchronized (configurationLock) {
            updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
        }
    }

    protected final boolean isClosed() {
        return configuration.isClosed();
    }

    protected final void startAsync(long startNanos) {
        startInternal(startNanos);
        Runnable r = () -> run(accessControllerContext);
        thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
        thread.start();
    }

    protected final void start(long startNanos) {
        startInternal(startNanos);
        thread = Thread.currentThread();
        run(accessControllerContext);
    }

    protected final Runnable getFlushOperation() {
        return flushOperation;
    }

    private void add(EventDispatcher e) {
        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
    }

    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
        if (!Thread.holdsLock(configurationLock)) {
            throw new InternalError("Modification of configuration without proper lock");
        }
        if (newConfiguration.hasChanged()) {
            // Publish objects held by configuration object
            VarHandle.releaseFence();
            configuration = newConfiguration;
            return true;
        }
        return false;
    }

    private void startInternal(long startNanos) {
        synchronized (configurationLock) {
            if (configuration.isStarted()) {
                throw new IllegalStateException("Event stream can only be started once");
            }
            StreamConfiguration c = new StreamConfiguration(configuration);
            if (active) {
                c.setStartNanos(startNanos);
            }
            c.setStarted(true);
            updateConfiguration(c);
        }
    }

    private void execute() {
        JVM.getJVM().exclude(Thread.currentThread());
        try {
            process();
        } catch (Exception e) {
            defaultErrorHandler(e);
        } finally {
            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
            try {
                close();
            } finally {
                synchronized (terminated) {
                    terminated.notifyAll();
                }
            }
        }
    }

    private void handleError(Throwable e) {
        Consumer<?>[] consumers = configuration.errorActions;
        if (consumers.length == 0) {
            defaultErrorHandler(e);
            return;
        }
        for (int i = 0; i < consumers.length; i++) {
            @SuppressWarnings("unchecked")
            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
            conusmer.accept(e);
        }
    }

    private void runFlushActions() {
        Runnable[] flushActions = configuration.getFlushActions();
        for (int i = 0; i < flushActions.length; i++) {
            try {
                flushActions[i].run();
            } catch (Exception e) {
                handleError(e);
            }
        }
    }

    private void run(AccessControlContext accessControlContext) {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                execute();
                return null;
            }
        }, accessControlContext);
    }

    private String nextThreadName() {
        counter.incrementAndGet();
        return "JFR Event Stream " + counter;
    }

    private void defaultErrorHandler(Throwable e) {
        e.printStackTrace();
    }
}