src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
branchJEP-349-branch
changeset 57372 50ca040843ea
child 57373 400db63e4937
equal deleted inserted replaced
57364:29635339ef62 57372:50ca040843ea
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.internal.consumer;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.invoke.MethodHandles;
       
    30 import java.lang.invoke.VarHandle;
       
    31 import java.security.AccessControlContext;
       
    32 import java.security.AccessController;
       
    33 import java.security.PrivilegedAction;
       
    34 import java.time.Duration;
       
    35 import java.util.ArrayList;
       
    36 import java.util.Arrays;
       
    37 import java.util.List;
       
    38 import java.util.Objects;
       
    39 import java.util.function.Consumer;
       
    40 
       
    41 import jdk.jfr.EventType;
       
    42 import jdk.jfr.consumer.LongMap;
       
    43 import jdk.jfr.consumer.RecordedEvent;
       
    44 import jdk.jfr.internal.JVM;
       
    45 import jdk.jfr.internal.LogLevel;
       
    46 import jdk.jfr.internal.LogTag;
       
    47 import jdk.jfr.internal.Logger;
       
    48 
       
    49 abstract public class EventConsumer implements Runnable {
       
    50 
       
    51     private final static class EventDispatcher {
       
    52         public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
       
    53 
       
    54         final private String eventName;
       
    55         final Consumer<RecordedEvent> action;
       
    56 
       
    57         public EventDispatcher(Consumer<RecordedEvent> action) {
       
    58             this(null, action);
       
    59         }
       
    60 
       
    61         public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
       
    62             this.eventName = eventName;
       
    63             this.action = action;
       
    64         }
       
    65 
       
    66         public void offer(RecordedEvent event) {
       
    67             action.accept(event);
       
    68         }
       
    69 
       
    70         public boolean accepts(EventType eventType) {
       
    71             return (eventName == null || eventType.getName().equals(eventName));
       
    72         }
       
    73     }
       
    74 
       
    75     private final static JVM jvm = JVM.getJVM();
       
    76     private final static VarHandle closedHandle;
       
    77     private final static VarHandle consumersHandle;
       
    78     private final static VarHandle dispatcherHandle;
       
    79     private final static VarHandle flushActionsHandle;
       
    80     private final static VarHandle closeActionsHandle;
       
    81     static {
       
    82         try {
       
    83             MethodHandles.Lookup l = MethodHandles.lookup();
       
    84             closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class);
       
    85             consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class);
       
    86             dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
       
    87             flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
       
    88             closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
       
    89         } catch (ReflectiveOperationException e) {
       
    90             throw new InternalError(e);
       
    91         }
       
    92     }
       
    93     // set by VarHandle
       
    94     private boolean closed;
       
    95     // set by VarHandle
       
    96     private EventDispatcher[] consumers = new EventDispatcher[0];
       
    97     // set by VarHandle
       
    98     private LongMap<EventDispatcher[]> dispatcher = new LongMap<>();
       
    99     // set by VarHandle
       
   100     private Runnable[] flushActions = new Runnable[0];
       
   101     // set by VarHandle
       
   102     private Runnable[] closeActions = new Runnable[0];
       
   103 
       
   104     protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
   105 
       
   106     private final AccessControlContext accessControlContext;
       
   107     private boolean started;
       
   108     private Thread thread;
       
   109 
       
   110     protected long startNanos;
       
   111 
       
   112     public EventConsumer(AccessControlContext acc) throws IOException {
       
   113         this.accessControlContext = acc;
       
   114     }
       
   115 
       
   116     public void run() {
       
   117         doPriviliged(() -> execute());
       
   118     }
       
   119 
       
   120     void doPriviliged(Runnable r) {
       
   121         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   122             @Override
       
   123             public Void run() {
       
   124                 r.run();
       
   125                 return null;
       
   126             }
       
   127         }, accessControlContext);
       
   128     }
       
   129 
       
   130     private void execute() {
       
   131         jvm.exclude(Thread.currentThread());
       
   132         try {
       
   133             process();
       
   134         } catch (Throwable e) {
       
   135             e.printStackTrace();
       
   136             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
       
   137         } finally {
       
   138             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
       
   139         }
       
   140     }
       
   141 
       
   142     public abstract void process() throws Exception;
       
   143 
       
   144     public synchronized boolean remove(Object action) {
       
   145         boolean remove = false;
       
   146         Runnable[] updatedFlushActions = removeAction(flushActions, action);
       
   147         if (updatedFlushActions != null) {
       
   148             flushActionsHandle.setVolatile(this, updatedFlushActions);
       
   149             remove = true;
       
   150         }
       
   151         Runnable[] updatedCloseActions = removeAction(closeActions, action);
       
   152         if (updatedCloseActions != null) {
       
   153             closeActionsHandle.setVolatile(this, updatedCloseActions);
       
   154             remove = true;
       
   155         }
       
   156 
       
   157         boolean removeConsumer = false;
       
   158         List<EventDispatcher> list = new ArrayList<>();
       
   159         for (int i = 0; i < consumers.length; i++) {
       
   160             if (consumers[i].action != action) {
       
   161                 list.add(consumers[i]);
       
   162             } else {
       
   163                 removeConsumer = true;
       
   164                 remove = true;
       
   165             }
       
   166         }
       
   167         if (removeConsumer) {
       
   168             EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
       
   169             consumersHandle.setVolatile(this, array);
       
   170             dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   171                                                                  // dispatch
       
   172         }
       
   173         return remove;
       
   174     }
       
   175 
       
   176     public void dispatch(RecordedEvent e) {
       
   177         EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId());
       
   178         if (consumerDispatch == null) {
       
   179             consumerDispatch = EventDispatcher.NO_DISPATCHERS;
       
   180             for (EventDispatcher ec : consumers.clone()) {
       
   181                 if (ec.accepts(e.getEventType())) {
       
   182                     consumerDispatch = merge(consumerDispatch, ec);
       
   183                 }
       
   184             }
       
   185             dispatcher.put(e.getEventType().getId(), consumerDispatch);
       
   186         }
       
   187         for (int i = 0; i < consumerDispatch.length; i++) {
       
   188             consumerDispatch[i].offer(e);
       
   189         }
       
   190 
       
   191     }
       
   192 
       
   193     public void onEvent(Consumer<RecordedEvent> action) {
       
   194         add(new EventDispatcher(action));
       
   195     }
       
   196 
       
   197     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
       
   198         add(new EventDispatcher(eventName, action));
       
   199     }
       
   200 
       
   201     private synchronized void add(EventDispatcher e) {
       
   202         consumersHandle.setVolatile(this, merge(consumers, e));
       
   203         dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   204     }
       
   205 
       
   206     public synchronized void onFlush(Runnable action) {
       
   207         flushActionsHandle.setVolatile(this, addAction(flushActions, action));
       
   208     }
       
   209 
       
   210     public synchronized void addCloseAction(Runnable action) {
       
   211         closeActionsHandle.setVolatile(this, addAction(closeActions, action));
       
   212     }
       
   213 
       
   214     public void setClosed(boolean closed) {
       
   215         closedHandle.setVolatile(this, closed);
       
   216     }
       
   217 
       
   218     final public boolean isClosed() {
       
   219         return closed;
       
   220     }
       
   221 
       
   222     public void runCloseActions() {
       
   223 
       
   224         Runnable[] cas = this.closeActions;
       
   225         for (int i = 0; i < cas.length; i++) {
       
   226             cas[i].run();
       
   227         }
       
   228     }
       
   229 
       
   230     public void runFlushActions() {
       
   231         Runnable[] fas = this.flushActions;
       
   232         for (int i = 0; i < fas.length; i++) {
       
   233             fas[i].run();
       
   234         }
       
   235     }
       
   236 
       
   237     public synchronized void startAsync(long startNanos) {
       
   238         if (started) {
       
   239             throw new IllegalStateException("Event stream can only be started once");
       
   240         }
       
   241         started = true;
       
   242         setStartNanos(startNanos);
       
   243         thread = new Thread(this);
       
   244         thread.setDaemon(true);
       
   245         thread.start();
       
   246     }
       
   247 
       
   248     public void start(long startNanos) {
       
   249         synchronized (this) {
       
   250             if (started) {
       
   251                 throw new IllegalStateException("Event stream can only be started once");
       
   252             }
       
   253             started = true;
       
   254             setStartNanos(startNanos);
       
   255         }
       
   256         run();
       
   257     }
       
   258 
       
   259     public void awaitTermination(Duration timeout) {
       
   260         Objects.requireNonNull(timeout);
       
   261         Thread t = null;
       
   262         synchronized (this) {
       
   263             t = thread;
       
   264         }
       
   265         if (t != null && t != Thread.currentThread()) {
       
   266             try {
       
   267                 t.join(timeout.toMillis());
       
   268             } catch (InterruptedException e) {
       
   269                 // ignore
       
   270             }
       
   271         }
       
   272     }
       
   273 
       
   274     public void awaitTermination() {
       
   275         awaitTermination(Duration.ofMillis(0));
       
   276     }
       
   277 
       
   278     private void setStartNanos(long startNanos) {
       
   279         this.startNanos = startNanos;
       
   280     }
       
   281 
       
   282     protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) {
       
   283         EventDispatcher[] array = new EventDispatcher[current.length + 1];
       
   284         System.arraycopy(current, 0, array, 0, current.length);
       
   285         array[current.length] = add;
       
   286         return array;
       
   287     }
       
   288 
       
   289     private static Runnable[] removeAction(Runnable[] array, Object action) {
       
   290         if (array.length == 0) {
       
   291             return null;
       
   292         }
       
   293         boolean remove = false;
       
   294         List<Runnable> list = new ArrayList<>();
       
   295         for (int i = 0; i < array.length; i++) {
       
   296             if (array[i] != action) {
       
   297                 list.add(array[i]);
       
   298             } else {
       
   299                 remove = true;
       
   300             }
       
   301         }
       
   302         if (remove) {
       
   303             return list.toArray(new Runnable[list.size()]);
       
   304         }
       
   305         return null;
       
   306     }
       
   307 
       
   308     private static Runnable[] addAction(Runnable[] array, Runnable action) {
       
   309         ArrayList<Runnable> a = new ArrayList<>();
       
   310         a.addAll(Arrays.asList(array));
       
   311         a.add(action);
       
   312         return a.toArray(new Runnable[0]);
       
   313     }
       
   314 
       
   315 }