src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java
branchJEP-349-branch
changeset 57449 099789ceff7d
child 57452 6fabe73e5d9a
equal deleted inserted replaced
57434:216bf2e3b542 57449:099789ceff7d
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.consumer;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.invoke.MethodHandles;
       
    30 import java.lang.invoke.VarHandle;
       
    31 import java.security.AccessControlContext;
       
    32 import java.security.AccessController;
       
    33 import java.security.PrivilegedAction;
       
    34 import java.time.Duration;
       
    35 import java.time.Instant;
       
    36 import java.util.ArrayList;
       
    37 import java.util.Arrays;
       
    38 import java.util.List;
       
    39 import java.util.Objects;
       
    40 import java.util.function.Consumer;
       
    41 
       
    42 import jdk.jfr.EventType;
       
    43 import jdk.jfr.internal.JVM;
       
    44 import jdk.jfr.internal.LogLevel;
       
    45 import jdk.jfr.internal.LogTag;
       
    46 import jdk.jfr.internal.Logger;
       
    47 import jdk.jfr.internal.LongMap;
       
    48 import jdk.jfr.internal.consumer.InternalEventFilter;
       
    49 
       
    50 abstract class EventConsumer implements Runnable {
       
    51 
       
    52     public final static Instant NEXT_EVENT = Instant.now();
       
    53 
       
    54     final static class EventDispatcher {
       
    55         public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
       
    56 
       
    57         final private String eventName;
       
    58         final Consumer<RecordedEvent> action;
       
    59 
       
    60         public EventDispatcher(Consumer<RecordedEvent> action) {
       
    61             this(null, action);
       
    62         }
       
    63 
       
    64         public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
       
    65             this.eventName = eventName;
       
    66             this.action = action;
       
    67         }
       
    68 
       
    69         public void offer(RecordedEvent event) {
       
    70             action.accept(event);
       
    71         }
       
    72 
       
    73         public boolean accepts(EventType eventType) {
       
    74             return (eventName == null || eventType.getName().equals(eventName));
       
    75         }
       
    76     }
       
    77 
       
    78     private final static JVM jvm = JVM.getJVM();
       
    79     private final static VarHandle closedHandle;
       
    80     private final static VarHandle consumersHandle;
       
    81     private final static VarHandle dispatcherHandle;
       
    82     private final static VarHandle flushActionsHandle;
       
    83     private final static VarHandle closeActionsHandle;
       
    84     private final static VarHandle orderedHandle;
       
    85     private final static VarHandle reuseHandle;
       
    86     private final static VarHandle startTimeHandle;
       
    87     static {
       
    88         try {
       
    89             MethodHandles.Lookup l = MethodHandles.lookup();
       
    90             closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class);
       
    91             consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class);
       
    92             dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
       
    93             flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
       
    94             closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
       
    95             orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class);
       
    96             reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class);
       
    97             startTimeHandle = l.findVarHandle(EventConsumer.class, "startTime", Instant.class);
       
    98         } catch (ReflectiveOperationException e) {
       
    99             throw new InternalError(e);
       
   100         }
       
   101     }
       
   102     // set by VarHandle
       
   103     private boolean closed;
       
   104     // set by VarHandle
       
   105     private EventDispatcher[] consumers = new EventDispatcher[0];
       
   106     // set by VarHandle
       
   107     private LongMap<EventDispatcher[]> dispatcher = new LongMap<>();
       
   108     // set by VarHandle
       
   109     private Runnable[] flushActions = new Runnable[0];
       
   110     // set by VarHandle
       
   111     private Runnable[] closeActions = new Runnable[0];
       
   112 
       
   113     private final AccessControlContext accessControlContext;
       
   114     protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
   115 
       
   116     private boolean started;
       
   117     private Thread thread;
       
   118 
       
   119     protected long startNanos;
       
   120     protected boolean ordered = true;
       
   121     protected boolean reuse = true;
       
   122     Instant startTime;
       
   123 
       
   124     public EventConsumer(AccessControlContext acc) throws IOException {
       
   125         this.accessControlContext = acc;
       
   126     }
       
   127 
       
   128     public void run() {
       
   129         doPriviliged(() -> execute());
       
   130     }
       
   131 
       
   132     void doPriviliged(Runnable r) {
       
   133         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   134             @Override
       
   135             public Void run() {
       
   136                 r.run();
       
   137                 return null;
       
   138             }
       
   139         }, accessControlContext);
       
   140     }
       
   141 
       
   142     private void execute() {
       
   143         jvm.exclude(Thread.currentThread());
       
   144         try {
       
   145             updateStartNanos();
       
   146             process();
       
   147         } catch (IOException e) {
       
   148             if (!isClosed()) {
       
   149                 logException(e);
       
   150             }
       
   151         } catch (Exception e) {
       
   152             logException(e);
       
   153         } finally {
       
   154             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
       
   155         }
       
   156     }
       
   157 
       
   158     // User setting overrides default
       
   159     private void updateStartNanos() {
       
   160         if (startTime != null) {
       
   161             try {
       
   162                 setStartNanos(startTime.toEpochMilli() * 1_000_000L);
       
   163             } catch (ArithmeticException ae) {
       
   164                 setStartNanos(Long.MAX_VALUE);
       
   165             }
       
   166         }
       
   167     }
       
   168 
       
   169     private void logException(Exception e) {
       
   170         e.printStackTrace(); // for debugging purposes, remove before
       
   171         // integration
       
   172         Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
       
   173     }
       
   174 
       
   175     public abstract void process() throws IOException;
       
   176 
       
   177     public synchronized boolean remove(Object action) {
       
   178         boolean remove = false;
       
   179         Runnable[] updatedFlushActions = removeAction(flushActions, action);
       
   180         if (updatedFlushActions != null) {
       
   181             flushActionsHandle.setVolatile(this, updatedFlushActions);
       
   182             remove = true;
       
   183         }
       
   184         Runnable[] updatedCloseActions = removeAction(closeActions, action);
       
   185         if (updatedCloseActions != null) {
       
   186             closeActionsHandle.setVolatile(this, updatedCloseActions);
       
   187             remove = true;
       
   188         }
       
   189 
       
   190         boolean removeConsumer = false;
       
   191         List<EventDispatcher> list = new ArrayList<>();
       
   192         for (int i = 0; i < consumers.length; i++) {
       
   193             if (consumers[i].action != action) {
       
   194                 list.add(consumers[i]);
       
   195             } else {
       
   196                 removeConsumer = true;
       
   197                 remove = true;
       
   198             }
       
   199         }
       
   200         if (removeConsumer) {
       
   201             EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
       
   202             eventFilter = buildFilter(array);
       
   203             consumersHandle.setVolatile(this, array);
       
   204             dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   205                                                                  // dispatch
       
   206         }
       
   207         return remove;
       
   208     }
       
   209 
       
   210     public void dispatch(RecordedEvent e) {
       
   211         if (e.endTime < startNanos) {
       
   212             return;
       
   213         }
       
   214 
       
   215         EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId());
       
   216         if (consumerDispatch == null) {
       
   217             consumerDispatch = EventDispatcher.NO_DISPATCHERS;
       
   218             for (EventDispatcher ec : consumers.clone()) {
       
   219                 if (ec.accepts(e.getEventType())) {
       
   220                     consumerDispatch = merge(consumerDispatch, ec);
       
   221                 }
       
   222             }
       
   223             dispatcher.put(e.getEventType().getId(), consumerDispatch);
       
   224         }
       
   225         for (int i = 0; i < consumerDispatch.length; i++) {
       
   226             try {
       
   227                 consumerDispatch[i].offer(e);
       
   228             } catch (Exception exception) {
       
   229                 // Is this a reasonable behavior for an exception?
       
   230                 // Error will abort the stream.
       
   231             }
       
   232         }
       
   233 
       
   234     }
       
   235 
       
   236     public void onEvent(Consumer<RecordedEvent> action) {
       
   237         add(new EventDispatcher(action));
       
   238     }
       
   239 
       
   240     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
       
   241         add(new EventDispatcher(eventName, action));
       
   242     }
       
   243 
       
   244     InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
       
   245         InternalEventFilter ef = new InternalEventFilter();
       
   246         for (EventDispatcher ed : dispatchers) {
       
   247             String name = ed.eventName;
       
   248             if (name == null) {
       
   249                 return InternalEventFilter.ACCEPT_ALL;
       
   250             }
       
   251             ef.setThreshold(name, 0);
       
   252         }
       
   253         return ef.threadSafe();
       
   254     }
       
   255 
       
   256     private synchronized void add(EventDispatcher e) {
       
   257         EventDispatcher[] dispatchers = merge(consumers, e);
       
   258         eventFilter = buildFilter(dispatchers);
       
   259         consumersHandle.setVolatile(this, dispatchers);
       
   260         dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   261     }
       
   262 
       
   263     public synchronized void onFlush(Runnable action) {
       
   264         flushActionsHandle.setVolatile(this, addAction(flushActions, action));
       
   265     }
       
   266 
       
   267     public synchronized void addCloseAction(Runnable action) {
       
   268         closeActionsHandle.setVolatile(this, addAction(closeActions, action));
       
   269     }
       
   270 
       
   271     public void setClosed(boolean closed) {
       
   272         closedHandle.setVolatile(this, closed);
       
   273     }
       
   274 
       
   275     final public boolean isClosed() {
       
   276         return closed;
       
   277     }
       
   278 
       
   279     public void runCloseActions() {
       
   280 
       
   281         Runnable[] cas = this.closeActions;
       
   282         for (int i = 0; i < cas.length; i++) {
       
   283             cas[i].run();
       
   284         }
       
   285     }
       
   286 
       
   287     public void runFlushActions() {
       
   288         Runnable[] fas = this.flushActions;
       
   289         for (int i = 0; i < fas.length; i++) {
       
   290             fas[i].run();
       
   291         }
       
   292     }
       
   293 
       
   294     public void startAsync(long startNanos) {
       
   295         if (started) {
       
   296             throw new IllegalStateException("Event stream can only be started once");
       
   297         }
       
   298         started = true;
       
   299         setStartNanos(startNanos);
       
   300         thread = new Thread(this);
       
   301         thread.setDaemon(true);
       
   302         thread.start();
       
   303     }
       
   304 
       
   305     public void start(long startNanos) {
       
   306         synchronized (this) {
       
   307             if (started) {
       
   308                 throw new IllegalStateException("Event stream can only be started once");
       
   309             }
       
   310             started = true;
       
   311             setStartNanos(startNanos);
       
   312         }
       
   313         run();
       
   314     }
       
   315 
       
   316     public void awaitTermination(Duration timeout) {
       
   317         Objects.requireNonNull(timeout);
       
   318         Thread t = null;
       
   319         synchronized (this) {
       
   320             t = thread;
       
   321         }
       
   322         if (t != null && t != Thread.currentThread()) {
       
   323             try {
       
   324                 t.join(timeout.toMillis());
       
   325             } catch (InterruptedException e) {
       
   326                 // ignore
       
   327             }
       
   328         }
       
   329     }
       
   330 
       
   331     public void awaitTermination() {
       
   332         awaitTermination(Duration.ofMillis(0));
       
   333     }
       
   334 
       
   335     private void setStartNanos(long startNanos) {
       
   336         this.startNanos = startNanos;
       
   337     }
       
   338 
       
   339     protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) {
       
   340         EventDispatcher[] array = new EventDispatcher[current.length + 1];
       
   341         System.arraycopy(current, 0, array, 0, current.length);
       
   342         array[current.length] = add;
       
   343         return array;
       
   344     }
       
   345 
       
   346     private static Runnable[] removeAction(Runnable[] array, Object action) {
       
   347         if (array.length == 0) {
       
   348             return null;
       
   349         }
       
   350         boolean remove = false;
       
   351         List<Runnable> list = new ArrayList<>();
       
   352         for (int i = 0; i < array.length; i++) {
       
   353             if (array[i] != action) {
       
   354                 list.add(array[i]);
       
   355             } else {
       
   356                 remove = true;
       
   357             }
       
   358         }
       
   359         if (remove) {
       
   360             return list.toArray(new Runnable[list.size()]);
       
   361         }
       
   362         return null;
       
   363     }
       
   364 
       
   365     private static Runnable[] addAction(Runnable[] array, Runnable action) {
       
   366         ArrayList<Runnable> a = new ArrayList<>();
       
   367         a.addAll(Arrays.asList(array));
       
   368         a.add(action);
       
   369         return a.toArray(new Runnable[0]);
       
   370     }
       
   371 
       
   372     abstract public void close();
       
   373 
       
   374     public void setReuse(boolean reuse) {
       
   375         reuseHandle.setVolatile(this, reuse);
       
   376     }
       
   377 
       
   378     public void setOrdered(boolean ordered) {
       
   379         orderedHandle.setVolatile(this, ordered);
       
   380     }
       
   381 
       
   382     public void setStartTime(Instant startTime) {
       
   383         Objects.nonNull(startTime);
       
   384         if (started) {
       
   385             throw new IllegalStateException("Stream is already started");
       
   386         }
       
   387         if (startTime.isBefore(Instant.EPOCH)) {
       
   388             startTime = Instant.EPOCH;
       
   389         }
       
   390         startTimeHandle.setVolatile(this, startTime);
       
   391     }
       
   392 
       
   393 }