--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri Jul 05 03:36:40 2019 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,363 +0,0 @@
-/*
- * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.jfr.internal.consumer;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.Consumer;
-
-import jdk.jfr.EventType;
-import jdk.jfr.consumer.RecordedEvent;
-import jdk.jfr.internal.JVM;
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.LongMap;
-
-abstract public class EventConsumer implements Runnable {
-
- private final static class EventDispatcher {
- public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-
- final private String eventName;
- final Consumer<RecordedEvent> action;
-
- public EventDispatcher(Consumer<RecordedEvent> action) {
- this(null, action);
- }
-
- public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
- this.eventName = eventName;
- this.action = action;
- }
-
- public void offer(RecordedEvent event) {
- action.accept(event);
- }
-
- public boolean accepts(EventType eventType) {
- return (eventName == null || eventType.getName().equals(eventName));
- }
- }
-
- private final static JVM jvm = JVM.getJVM();
- private final static VarHandle closedHandle;
- private final static VarHandle consumersHandle;
- private final static VarHandle dispatcherHandle;
- private final static VarHandle flushActionsHandle;
- private final static VarHandle closeActionsHandle;
- private final static VarHandle orderedHandle;
- private final static VarHandle reuseHandle;
- static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class);
- consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class);
- dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
- flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
- closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
- orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class);
- reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class);
-
- } catch (ReflectiveOperationException e) {
- throw new InternalError(e);
- }
- }
- // set by VarHandle
- private boolean closed;
- // set by VarHandle
- private EventDispatcher[] consumers = new EventDispatcher[0];
- // set by VarHandle
- private LongMap<EventDispatcher[]> dispatcher = new LongMap<>();
- // set by VarHandle
- private Runnable[] flushActions = new Runnable[0];
- // set by VarHandle
- private Runnable[] closeActions = new Runnable[0];
-
- protected boolean ordered = true;
-
- protected boolean reuse = true;
-
- protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
-
- private final AccessControlContext accessControlContext;
- private boolean started;
- private Thread thread;
-
- protected long startNanos;
-
- public EventConsumer(AccessControlContext acc) throws IOException {
- this.accessControlContext = acc;
- }
-
- public void run() {
- doPriviliged(() -> execute());
- }
-
- void doPriviliged(Runnable r) {
- AccessController.doPrivileged(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
- r.run();
- return null;
- }
- }, accessControlContext);
- }
-
- private void execute() {
- jvm.exclude(Thread.currentThread());
- try {
- process();
- } catch (IOException e) {
- if (!isClosed()) {
- logException(e);
- }
- } catch (Exception e) {
- logException(e);
- } finally {
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
- }
- }
-
- private void logException(Exception e) {
- e.printStackTrace(); // for debugging purposes, remove before
- // integration
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
- }
-
- public abstract void process() throws IOException;
-
- public synchronized boolean remove(Object action) {
- boolean remove = false;
- Runnable[] updatedFlushActions = removeAction(flushActions, action);
- if (updatedFlushActions != null) {
- flushActionsHandle.setVolatile(this, updatedFlushActions);
- remove = true;
- }
- Runnable[] updatedCloseActions = removeAction(closeActions, action);
- if (updatedCloseActions != null) {
- closeActionsHandle.setVolatile(this, updatedCloseActions);
- remove = true;
- }
-
- boolean removeConsumer = false;
- List<EventDispatcher> list = new ArrayList<>();
- for (int i = 0; i < consumers.length; i++) {
- if (consumers[i].action != action) {
- list.add(consumers[i]);
- } else {
- removeConsumer = true;
- remove = true;
- }
- }
- if (removeConsumer) {
- EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
- eventFilter = buildFilter(array);
- consumersHandle.setVolatile(this, array);
- dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
- // dispatch
- }
- return remove;
- }
-
- public void dispatch(RecordedEvent e) {
- EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId());
- if (consumerDispatch == null) {
- consumerDispatch = EventDispatcher.NO_DISPATCHERS;
- for (EventDispatcher ec : consumers.clone()) {
- if (ec.accepts(e.getEventType())) {
- consumerDispatch = merge(consumerDispatch, ec);
- }
- }
- dispatcher.put(e.getEventType().getId(), consumerDispatch);
- }
- for (int i = 0; i < consumerDispatch.length; i++) {
- try {
- consumerDispatch[i].offer(e);
- } catch (Exception exception) {
- // Is this a reasonable behavior for an exception?
- // Error will abort the stream.
- }
- }
-
- }
-
- public void onEvent(Consumer<RecordedEvent> action) {
- add(new EventDispatcher(action));
- }
-
- public void onEvent(String eventName, Consumer<RecordedEvent> action) {
- add(new EventDispatcher(eventName, action));
- }
-
- InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
- InternalEventFilter ef = new InternalEventFilter();
- for (EventDispatcher ed : dispatchers) {
- String name = ed.eventName;
- if (name == null) {
- return InternalEventFilter.ACCEPT_ALL;
- }
- ef.setThreshold(name, 0);
- }
- return ef.threadSafe();
- }
-
- private synchronized void add(EventDispatcher e) {
- EventDispatcher[] dispatchers = merge(consumers,e);
- eventFilter = buildFilter(dispatchers);
- consumersHandle.setVolatile(this, dispatchers);
- dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
- }
-
- public synchronized void onFlush(Runnable action) {
- flushActionsHandle.setVolatile(this, addAction(flushActions, action));
- }
-
- public synchronized void addCloseAction(Runnable action) {
- closeActionsHandle.setVolatile(this, addAction(closeActions, action));
- }
-
- public void setClosed(boolean closed) {
- closedHandle.setVolatile(this, closed);
- }
-
- final public boolean isClosed() {
- return closed;
- }
-
- public void runCloseActions() {
-
- Runnable[] cas = this.closeActions;
- for (int i = 0; i < cas.length; i++) {
- cas[i].run();
- }
- }
-
- public void runFlushActions() {
- Runnable[] fas = this.flushActions;
- for (int i = 0; i < fas.length; i++) {
- fas[i].run();
- }
- }
-
- public synchronized void startAsync(long startNanos) {
- if (started) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- started = true;
- setStartNanos(startNanos);
- thread = new Thread(this);
- thread.setDaemon(true);
- thread.start();
- }
-
- public void start(long startNanos) {
- synchronized (this) {
- if (started) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- started = true;
- setStartNanos(startNanos);
- }
- run();
- }
-
- public void awaitTermination(Duration timeout) {
- Objects.requireNonNull(timeout);
- Thread t = null;
- synchronized (this) {
- t = thread;
- }
- if (t != null && t != Thread.currentThread()) {
- try {
- t.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- public void awaitTermination() {
- awaitTermination(Duration.ofMillis(0));
- }
-
- private void setStartNanos(long startNanos) {
- this.startNanos = startNanos;
- }
-
- protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) {
- EventDispatcher[] array = new EventDispatcher[current.length + 1];
- System.arraycopy(current, 0, array, 0, current.length);
- array[current.length] = add;
- return array;
- }
-
- private static Runnable[] removeAction(Runnable[] array, Object action) {
- if (array.length == 0) {
- return null;
- }
- boolean remove = false;
- List<Runnable> list = new ArrayList<>();
- for (int i = 0; i < array.length; i++) {
- if (array[i] != action) {
- list.add(array[i]);
- } else {
- remove = true;
- }
- }
- if (remove) {
- return list.toArray(new Runnable[list.size()]);
- }
- return null;
- }
-
- private static Runnable[] addAction(Runnable[] array, Runnable action) {
- ArrayList<Runnable> a = new ArrayList<>();
- a.addAll(Arrays.asList(array));
- a.add(action);
- return a.toArray(new Runnable[0]);
- }
-
- abstract public void close();
-
- public void setReuse(boolean reuse) {
- reuseHandle.setVolatile(this, reuse);
- }
-
- public void setOrdered(boolean ordered) {
- orderedHandle.setVolatile(this, ordered);
- }
-
-}
\ No newline at end of file