src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java
branchJEP-349-branch
changeset 57702 c75c241c492a
parent 57690 9316d02dd4a5
child 57717 4ce66d271065
equal deleted inserted replaced
57690:9316d02dd4a5 57702:c75c241c492a
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.consumer;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.invoke.MethodHandles;
       
    30 import java.lang.invoke.VarHandle;
       
    31 import java.security.AccessControlContext;
       
    32 import java.security.AccessController;
       
    33 import java.security.PrivilegedAction;
       
    34 import java.time.Duration;
       
    35 import java.time.Instant;
       
    36 import java.util.ArrayList;
       
    37 import java.util.Arrays;
       
    38 import java.util.Comparator;
       
    39 import java.util.List;
       
    40 import java.util.Objects;
       
    41 import java.util.function.Consumer;
       
    42 
       
    43 import jdk.jfr.EventType;
       
    44 import jdk.jfr.internal.JVM;
       
    45 import jdk.jfr.internal.LogLevel;
       
    46 import jdk.jfr.internal.LogTag;
       
    47 import jdk.jfr.internal.Logger;
       
    48 import jdk.jfr.internal.LongMap;
       
    49 import jdk.jfr.internal.consumer.InternalEventFilter;
       
    50 
       
    51 abstract class EventConsumer implements Runnable {
       
    52 
       
    53     public final static Instant NEXT_EVENT = Instant.now();
       
    54     public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
       
    55 
       
    56     final static class EventDispatcher {
       
    57         public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
       
    58 
       
    59         final private String eventName;
       
    60         final Consumer<RecordedEvent> action;
       
    61 
       
    62         public EventDispatcher(Consumer<RecordedEvent> action) {
       
    63             this(null, action);
       
    64         }
       
    65 
       
    66         public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
       
    67             this.eventName = eventName;
       
    68             this.action = action;
       
    69         }
       
    70 
       
    71         public void offer(RecordedEvent event) {
       
    72             action.accept(event);
       
    73         }
       
    74 
       
    75         public boolean accepts(EventType eventType) {
       
    76             return (eventName == null || eventType.getName().equals(eventName));
       
    77         }
       
    78     }
       
    79 
       
    80     private final static JVM jvm = JVM.getJVM();
       
    81     private final static VarHandle closedHandle;
       
    82     private final static VarHandle consumersHandle;
       
    83     private final static VarHandle dispatcherHandle;
       
    84     private final static VarHandle flushActionsHandle;
       
    85     private final static VarHandle closeActionsHandle;
       
    86     private final static VarHandle orderedHandle;
       
    87     private final static VarHandle reuseHandle;
       
    88     private final static VarHandle startTimeHandle;
       
    89     static {
       
    90         try {
       
    91             MethodHandles.Lookup l = MethodHandles.lookup();
       
    92             closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class);
       
    93             consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class);
       
    94             dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
       
    95             flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
       
    96             closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
       
    97             orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class);
       
    98             reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class);
       
    99             startTimeHandle = l.findVarHandle(EventConsumer.class, "startTime", Instant.class);
       
   100         } catch (ReflectiveOperationException e) {
       
   101             throw new InternalError(e);
       
   102         }
       
   103     }
       
   104     // set by VarHandle
       
   105     private boolean closed;
       
   106     // set by VarHandle
       
   107     private EventDispatcher[] consumers = new EventDispatcher[0];
       
   108     // set by VarHandle
       
   109     private LongMap<EventDispatcher[]> dispatcher = new LongMap<>();
       
   110     // set by VarHandle
       
   111     private Runnable[] flushActions = new Runnable[0];
       
   112     // set by VarHandle
       
   113     private Runnable[] closeActions = new Runnable[0];
       
   114 
       
   115     private final AccessControlContext accessControlContext;
       
   116     protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
   117 
       
   118     private boolean started;
       
   119     private Thread thread;
       
   120 
       
   121     protected long startNanos;
       
   122     protected boolean ordered = true;
       
   123     protected boolean reuse = true;
       
   124     Instant startTime;
       
   125 
       
   126     public EventConsumer(AccessControlContext acc) throws IOException {
       
   127         this.accessControlContext = acc;
       
   128     }
       
   129 
       
   130     public void run() {
       
   131         doPriviliged(() -> execute());
       
   132     }
       
   133 
       
   134     void doPriviliged(Runnable r) {
       
   135         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   136             @Override
       
   137             public Void run() {
       
   138                 r.run();
       
   139                 return null;
       
   140             }
       
   141         }, accessControlContext);
       
   142     }
       
   143 
       
   144     private void execute() {
       
   145         jvm.exclude(Thread.currentThread());
       
   146         try {
       
   147             updateStartNanos();
       
   148             process();
       
   149         } catch (IOException e) {
       
   150             if (!isClosed()) {
       
   151                 logException(e);
       
   152             }
       
   153         } catch (Exception e) {
       
   154             logException(e);
       
   155         } finally {
       
   156             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
       
   157         }
       
   158     }
       
   159 
       
   160     // User setting overrides default
       
   161     private void updateStartNanos() {
       
   162         if (startTime != null) {
       
   163             try {
       
   164                 setStartNanos(startTime.toEpochMilli() * 1_000_000L);
       
   165             } catch (ArithmeticException ae) {
       
   166                 setStartNanos(Long.MAX_VALUE);
       
   167             }
       
   168         }
       
   169     }
       
   170 
       
   171     private void logException(Exception e) {
       
   172         e.printStackTrace(); // for debugging purposes, remove before
       
   173         // integration
       
   174         Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
       
   175     }
       
   176 
       
   177     public abstract void process() throws IOException;
       
   178 
       
   179     public synchronized boolean remove(Object action) {
       
   180         boolean remove = false;
       
   181         Runnable[] updatedFlushActions = removeAction(flushActions, action);
       
   182         if (updatedFlushActions != null) {
       
   183             flushActionsHandle.setVolatile(this, updatedFlushActions);
       
   184             remove = true;
       
   185         }
       
   186         Runnable[] updatedCloseActions = removeAction(closeActions, action);
       
   187         if (updatedCloseActions != null) {
       
   188             closeActionsHandle.setVolatile(this, updatedCloseActions);
       
   189             remove = true;
       
   190         }
       
   191 
       
   192         boolean removeConsumer = false;
       
   193         List<EventDispatcher> list = new ArrayList<>();
       
   194         for (int i = 0; i < consumers.length; i++) {
       
   195             if (consumers[i].action != action) {
       
   196                 list.add(consumers[i]);
       
   197             } else {
       
   198                 removeConsumer = true;
       
   199                 remove = true;
       
   200             }
       
   201         }
       
   202         if (removeConsumer) {
       
   203             EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
       
   204             eventFilter = buildFilter(array);
       
   205             consumersHandle.setVolatile(this, array);
       
   206             dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   207                                                                  // dispatch
       
   208         }
       
   209         return remove;
       
   210     }
       
   211 
       
   212     public void dispatch(RecordedEvent e) {
       
   213         EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId());
       
   214         if (consumerDispatch == null) {
       
   215             consumerDispatch = EventDispatcher.NO_DISPATCHERS;
       
   216             for (EventDispatcher ec : consumers.clone()) {
       
   217                 if (ec.accepts(e.getEventType())) {
       
   218                     consumerDispatch = merge(consumerDispatch, ec);
       
   219                 }
       
   220             }
       
   221             dispatcher.put(e.getEventType().getId(), consumerDispatch);
       
   222         }
       
   223         for (int i = 0; i < consumerDispatch.length; i++) {
       
   224             try {
       
   225                 consumerDispatch[i].offer(e);
       
   226             } catch (Exception exception) {
       
   227                 // Is this a reasonable behavior for an exception?
       
   228                 // Error will abort the stream.
       
   229             }
       
   230         }
       
   231 
       
   232     }
       
   233 
       
   234     public void onEvent(Consumer<RecordedEvent> action) {
       
   235         add(new EventDispatcher(action));
       
   236     }
       
   237 
       
   238     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
       
   239         add(new EventDispatcher(eventName, action));
       
   240     }
       
   241 
       
   242     InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
       
   243         InternalEventFilter ef = new InternalEventFilter();
       
   244         for (EventDispatcher ed : dispatchers) {
       
   245             String name = ed.eventName;
       
   246             if (name == null) {
       
   247                 return InternalEventFilter.ACCEPT_ALL;
       
   248             }
       
   249             ef.setThreshold(name, 0);
       
   250         }
       
   251         return ef.threadSafe();
       
   252     }
       
   253 
       
   254     private synchronized void add(EventDispatcher e) {
       
   255         EventDispatcher[] dispatchers = merge(consumers, e);
       
   256         eventFilter = buildFilter(dispatchers);
       
   257         consumersHandle.setVolatile(this, dispatchers);
       
   258         dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   259     }
       
   260 
       
   261     public synchronized void onFlush(Runnable action) {
       
   262         flushActionsHandle.setVolatile(this, addAction(flushActions, action));
       
   263     }
       
   264 
       
   265     public synchronized void addCloseAction(Runnable action) {
       
   266         closeActionsHandle.setVolatile(this, addAction(closeActions, action));
       
   267     }
       
   268 
       
   269     public void setClosed(boolean closed) {
       
   270         closedHandle.setVolatile(this, closed);
       
   271     }
       
   272 
       
   273     final public boolean isClosed() {
       
   274         return closed;
       
   275     }
       
   276 
       
   277     public void runCloseActions() {
       
   278 
       
   279         Runnable[] cas = this.closeActions;
       
   280         for (int i = 0; i < cas.length; i++) {
       
   281             cas[i].run();
       
   282         }
       
   283     }
       
   284 
       
   285     public void runFlushActions() {
       
   286         Runnable[] fas = this.flushActions;
       
   287         for (int i = 0; i < fas.length; i++) {
       
   288             fas[i].run();
       
   289         }
       
   290     }
       
   291 
       
   292     public void startAsync(long startNanos) {
       
   293         if (started) {
       
   294             throw new IllegalStateException("Event stream can only be started once");
       
   295         }
       
   296         started = true;
       
   297         setStartNanos(startNanos);
       
   298         thread = new Thread(this);
       
   299         thread.setDaemon(true);
       
   300         thread.start();
       
   301     }
       
   302 
       
   303     public void start(long startNanos) {
       
   304         synchronized (this) {
       
   305             if (started) {
       
   306                 throw new IllegalStateException("Event stream can only be started once");
       
   307             }
       
   308             started = true;
       
   309             setStartNanos(startNanos);
       
   310         }
       
   311         run();
       
   312     }
       
   313 
       
   314     public void awaitTermination(Duration timeout) {
       
   315         Objects.requireNonNull(timeout);
       
   316         Thread t = null;
       
   317         synchronized (this) {
       
   318             t = thread;
       
   319         }
       
   320         if (t != null && t != Thread.currentThread()) {
       
   321             try {
       
   322                 t.join(timeout.toMillis());
       
   323             } catch (InterruptedException e) {
       
   324                 // ignore
       
   325             }
       
   326         }
       
   327     }
       
   328 
       
   329     public void awaitTermination() {
       
   330         awaitTermination(Duration.ofMillis(0));
       
   331     }
       
   332 
       
   333     private void setStartNanos(long startNanos) {
       
   334         this.startNanos = startNanos;
       
   335     }
       
   336 
       
   337     protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) {
       
   338         EventDispatcher[] array = new EventDispatcher[current.length + 1];
       
   339         System.arraycopy(current, 0, array, 0, current.length);
       
   340         array[current.length] = add;
       
   341         return array;
       
   342     }
       
   343 
       
   344     private static Runnable[] removeAction(Runnable[] array, Object action) {
       
   345         if (array.length == 0) {
       
   346             return null;
       
   347         }
       
   348         boolean remove = false;
       
   349         List<Runnable> list = new ArrayList<>();
       
   350         for (int i = 0; i < array.length; i++) {
       
   351             if (array[i] != action) {
       
   352                 list.add(array[i]);
       
   353             } else {
       
   354                 remove = true;
       
   355             }
       
   356         }
       
   357         if (remove) {
       
   358             return list.toArray(new Runnable[list.size()]);
       
   359         }
       
   360         return null;
       
   361     }
       
   362 
       
   363     private static Runnable[] addAction(Runnable[] array, Runnable action) {
       
   364         ArrayList<Runnable> a = new ArrayList<>();
       
   365         a.addAll(Arrays.asList(array));
       
   366         a.add(action);
       
   367         return a.toArray(new Runnable[0]);
       
   368     }
       
   369 
       
   370     abstract public void close();
       
   371 
       
   372     public void setReuse(boolean reuse) {
       
   373         reuseHandle.setVolatile(this, reuse);
       
   374     }
       
   375 
       
   376     public void setOrdered(boolean ordered) {
       
   377         orderedHandle.setVolatile(this, ordered);
       
   378     }
       
   379 
       
   380     public void setStartTime(Instant startTime) {
       
   381         Objects.nonNull(startTime);
       
   382         if (started) {
       
   383             throw new IllegalStateException("Stream is already started");
       
   384         }
       
   385         if (startTime.isBefore(Instant.EPOCH)) {
       
   386             startTime = Instant.EPOCH;
       
   387         }
       
   388         startTimeHandle.setVolatile(this, startTime);
       
   389     }
       
   390 
       
   391 }