# HG changeset patch # User egahlin # Date 1565371015 -7200 # Node ID c75c241c492ab940b27e0e8f4bf1f45290ff8784 # Parent 9316d02dd4a57087ee81662ce385f144c0a7e535 Remove obsolete EventConsumer.java file diff -r 9316d02dd4a5 -r c75c241c492a src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java Fri Aug 09 01:18:18 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,391 +0,0 @@ -/* - * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.jfr.consumer; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; -import java.security.AccessControlContext; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Objects; -import java.util.function.Consumer; - -import jdk.jfr.EventType; -import jdk.jfr.internal.JVM; -import jdk.jfr.internal.LogLevel; -import jdk.jfr.internal.LogTag; -import jdk.jfr.internal.Logger; -import jdk.jfr.internal.LongMap; -import jdk.jfr.internal.consumer.InternalEventFilter; - -abstract class EventConsumer implements Runnable { - - public final static Instant NEXT_EVENT = Instant.now(); - public final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); - - final static class EventDispatcher { - public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - - final private String eventName; - final Consumer action; - - public EventDispatcher(Consumer action) { - this(null, action); - } - - public EventDispatcher(String eventName, Consumer action) { - this.eventName = eventName; - this.action = action; - } - - public void offer(RecordedEvent event) { - action.accept(event); - } - - public boolean accepts(EventType eventType) { - return (eventName == null || eventType.getName().equals(eventName)); - } - } - - private final static JVM jvm = JVM.getJVM(); - private final static VarHandle closedHandle; - private final static VarHandle consumersHandle; - private final static VarHandle dispatcherHandle; - private final static VarHandle flushActionsHandle; - private final static VarHandle closeActionsHandle; - private final static VarHandle orderedHandle; - private final static VarHandle reuseHandle; - private final static VarHandle startTimeHandle; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class); - consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class); - dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class); - flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class); - closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class); - orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class); - reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class); - startTimeHandle = l.findVarHandle(EventConsumer.class, "startTime", Instant.class); - } catch (ReflectiveOperationException e) { - throw new InternalError(e); - } - } - // set by VarHandle - private boolean closed; - // set by VarHandle - private EventDispatcher[] consumers = new EventDispatcher[0]; - // set by VarHandle - private LongMap dispatcher = new LongMap<>(); - // set by VarHandle - private Runnable[] flushActions = new Runnable[0]; - // set by VarHandle - private Runnable[] closeActions = new Runnable[0]; - - private final AccessControlContext accessControlContext; - protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; - - private boolean started; - private Thread thread; - - protected long startNanos; - protected boolean ordered = true; - protected boolean reuse = true; - Instant startTime; - - public EventConsumer(AccessControlContext acc) throws IOException { - this.accessControlContext = acc; - } - - public void run() { - doPriviliged(() -> execute()); - } - - void doPriviliged(Runnable r) { - AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Void run() { - r.run(); - return null; - } - }, accessControlContext); - } - - private void execute() { - jvm.exclude(Thread.currentThread()); - try { - updateStartNanos(); - process(); - } catch (IOException e) { - if (!isClosed()) { - logException(e); - } - } catch (Exception e) { - logException(e); - } finally { - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); - } - } - - // User setting overrides default - private void updateStartNanos() { - if (startTime != null) { - try { - setStartNanos(startTime.toEpochMilli() * 1_000_000L); - } catch (ArithmeticException ae) { - setStartNanos(Long.MAX_VALUE); - } - } - } - - private void logException(Exception e) { - e.printStackTrace(); // for debugging purposes, remove before - // integration - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); - } - - public abstract void process() throws IOException; - - public synchronized boolean remove(Object action) { - boolean remove = false; - Runnable[] updatedFlushActions = removeAction(flushActions, action); - if (updatedFlushActions != null) { - flushActionsHandle.setVolatile(this, updatedFlushActions); - remove = true; - } - Runnable[] updatedCloseActions = removeAction(closeActions, action); - if (updatedCloseActions != null) { - closeActionsHandle.setVolatile(this, updatedCloseActions); - remove = true; - } - - boolean removeConsumer = false; - List list = new ArrayList<>(); - for (int i = 0; i < consumers.length; i++) { - if (consumers[i].action != action) { - list.add(consumers[i]); - } else { - removeConsumer = true; - remove = true; - } - } - if (removeConsumer) { - EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); - eventFilter = buildFilter(array); - consumersHandle.setVolatile(this, array); - dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset - // dispatch - } - return remove; - } - - public void dispatch(RecordedEvent e) { - EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId()); - if (consumerDispatch == null) { - consumerDispatch = EventDispatcher.NO_DISPATCHERS; - for (EventDispatcher ec : consumers.clone()) { - if (ec.accepts(e.getEventType())) { - consumerDispatch = merge(consumerDispatch, ec); - } - } - dispatcher.put(e.getEventType().getId(), consumerDispatch); - } - for (int i = 0; i < consumerDispatch.length; i++) { - try { - consumerDispatch[i].offer(e); - } catch (Exception exception) { - // Is this a reasonable behavior for an exception? - // Error will abort the stream. - } - } - - } - - public void onEvent(Consumer action) { - add(new EventDispatcher(action)); - } - - public void onEvent(String eventName, Consumer action) { - add(new EventDispatcher(eventName, action)); - } - - InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { - InternalEventFilter ef = new InternalEventFilter(); - for (EventDispatcher ed : dispatchers) { - String name = ed.eventName; - if (name == null) { - return InternalEventFilter.ACCEPT_ALL; - } - ef.setThreshold(name, 0); - } - return ef.threadSafe(); - } - - private synchronized void add(EventDispatcher e) { - EventDispatcher[] dispatchers = merge(consumers, e); - eventFilter = buildFilter(dispatchers); - consumersHandle.setVolatile(this, dispatchers); - dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset - } - - public synchronized void onFlush(Runnable action) { - flushActionsHandle.setVolatile(this, addAction(flushActions, action)); - } - - public synchronized void addCloseAction(Runnable action) { - closeActionsHandle.setVolatile(this, addAction(closeActions, action)); - } - - public void setClosed(boolean closed) { - closedHandle.setVolatile(this, closed); - } - - final public boolean isClosed() { - return closed; - } - - public void runCloseActions() { - - Runnable[] cas = this.closeActions; - for (int i = 0; i < cas.length; i++) { - cas[i].run(); - } - } - - public void runFlushActions() { - Runnable[] fas = this.flushActions; - for (int i = 0; i < fas.length; i++) { - fas[i].run(); - } - } - - public void startAsync(long startNanos) { - if (started) { - throw new IllegalStateException("Event stream can only be started once"); - } - started = true; - setStartNanos(startNanos); - thread = new Thread(this); - thread.setDaemon(true); - thread.start(); - } - - public void start(long startNanos) { - synchronized (this) { - if (started) { - throw new IllegalStateException("Event stream can only be started once"); - } - started = true; - setStartNanos(startNanos); - } - run(); - } - - public void awaitTermination(Duration timeout) { - Objects.requireNonNull(timeout); - Thread t = null; - synchronized (this) { - t = thread; - } - if (t != null && t != Thread.currentThread()) { - try { - t.join(timeout.toMillis()); - } catch (InterruptedException e) { - // ignore - } - } - } - - public void awaitTermination() { - awaitTermination(Duration.ofMillis(0)); - } - - private void setStartNanos(long startNanos) { - this.startNanos = startNanos; - } - - protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) { - EventDispatcher[] array = new EventDispatcher[current.length + 1]; - System.arraycopy(current, 0, array, 0, current.length); - array[current.length] = add; - return array; - } - - private static Runnable[] removeAction(Runnable[] array, Object action) { - if (array.length == 0) { - return null; - } - boolean remove = false; - List list = new ArrayList<>(); - for (int i = 0; i < array.length; i++) { - if (array[i] != action) { - list.add(array[i]); - } else { - remove = true; - } - } - if (remove) { - return list.toArray(new Runnable[list.size()]); - } - return null; - } - - private static Runnable[] addAction(Runnable[] array, Runnable action) { - ArrayList a = new ArrayList<>(); - a.addAll(Arrays.asList(array)); - a.add(action); - return a.toArray(new Runnable[0]); - } - - abstract public void close(); - - public void setReuse(boolean reuse) { - reuseHandle.setVolatile(this, reuse); - } - - public void setOrdered(boolean ordered) { - orderedHandle.setVolatile(this, ordered); - } - - public void setStartTime(Instant startTime) { - Objects.nonNull(startTime); - if (started) { - throw new IllegalStateException("Stream is already started"); - } - if (startTime.isBefore(Instant.EPOCH)) { - startTime = Instant.EPOCH; - } - startTimeHandle.setVolatile(this, startTime); - } - -} \ No newline at end of file