src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java
branchJEP-349-branch
changeset 58129 7b751fe181a5
equal deleted inserted replaced
58121:6f8f18ac1d54 58129:7b751fe181a5
       
     1 package jdk.jfr.consumer;
       
     2 
       
     3 import java.time.Instant;
       
     4 import java.util.ArrayList;
       
     5 import java.util.List;
       
     6 import java.util.function.Consumer;
       
     7 
       
     8 import jdk.jfr.EventType;
       
     9 import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
       
    10 import jdk.jfr.internal.LongMap;
       
    11 import jdk.jfr.internal.consumer.InternalEventFilter;
       
    12 
       
    13 public final class Dispatcher {
       
    14 
       
    15     public final static class EventDispatcher {
       
    16         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
       
    17         final String eventName;
       
    18         final Consumer<RecordedEvent> action;
       
    19 
       
    20         public EventDispatcher(Consumer<RecordedEvent> action) {
       
    21             this(null, action);
       
    22         }
       
    23 
       
    24         public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
       
    25             this.eventName = eventName;
       
    26             this.action = action;
       
    27         }
       
    28 
       
    29         public void offer(RecordedEvent event) {
       
    30             action.accept(event);
       
    31         }
       
    32 
       
    33         public boolean accepts(EventType eventType) {
       
    34             return (eventName == null || eventType.getName().equals(eventName));
       
    35         }
       
    36     }
       
    37 
       
    38     final Consumer<Throwable>[] errorActions;
       
    39     final Runnable[] flushActions;
       
    40     final Runnable[] closeActions;
       
    41     final EventDispatcher[] dispatchers;
       
    42     final LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
       
    43     final ParserConfiguration parserConfiguration;
       
    44     final Instant startTime;
       
    45     final Instant endTime;
       
    46     final long startNanos;
       
    47     final long endNanos;
       
    48 
       
    49     // Cache
       
    50     private EventType cacheEventType;
       
    51     private EventDispatcher[] cacheDispatchers;
       
    52 
       
    53     @SuppressWarnings({"unchecked","rawtypes"})
       
    54     public Dispatcher(StreamConfiguration c) {
       
    55         this.flushActions = c.flushActions.toArray(new Runnable[0]);
       
    56         this.closeActions = c.closeActions.toArray(new Runnable[0]);
       
    57         this.errorActions = c.errorActions.toArray(new Consumer[0]);
       
    58         this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
       
    59         this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
       
    60         this.startTime = c.startTime;
       
    61         this.endTime = c.endTime;
       
    62         this.startNanos = c.startNanos;
       
    63         this.endNanos = c.endNanos;
       
    64     }
       
    65 
       
    66     private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
       
    67         InternalEventFilter ef = new InternalEventFilter();
       
    68         for (EventDispatcher ed : dispatchers) {
       
    69             String name = ed.eventName;
       
    70             if (name == null) {
       
    71                 return InternalEventFilter.ACCEPT_ALL;
       
    72             }
       
    73             ef.setThreshold(name, 0);
       
    74         }
       
    75         return ef;
       
    76     }
       
    77 
       
    78     protected final void dispatch(RecordedEvent event) {
       
    79         EventType type = event.getEventType();
       
    80         EventDispatcher[] dispatchers = null;
       
    81         if (type == cacheEventType) {
       
    82             dispatchers = cacheDispatchers;
       
    83         } else {
       
    84             dispatchers = dispatcherLookup.get(type.getId());
       
    85             if (dispatchers == null) {
       
    86                 List<EventDispatcher> list = new ArrayList<>();
       
    87                 for (EventDispatcher e : this.dispatchers) {
       
    88                     if (e.accepts(type)) {
       
    89                         list.add(e);
       
    90                     }
       
    91                 }
       
    92                 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
       
    93                 dispatcherLookup.put(type.getId(), dispatchers);
       
    94             }
       
    95             cacheDispatchers = dispatchers;
       
    96         }
       
    97         for (int i = 0; i < dispatchers.length; i++) {
       
    98             try {
       
    99                 dispatchers[i].offer(event);
       
   100             } catch (Exception e) {
       
   101                 handleError(e);
       
   102             }
       
   103         }
       
   104     }
       
   105 
       
   106     public void handleError(Throwable e) {
       
   107         Consumer<?>[] consumers = this.errorActions;
       
   108         if (consumers.length == 0) {
       
   109             defaultErrorHandler(e);
       
   110             return;
       
   111         }
       
   112         for (int i = 0; i < consumers.length; i++) {
       
   113             @SuppressWarnings("unchecked")
       
   114             Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
       
   115             conusmer.accept(e);
       
   116         }
       
   117     }
       
   118 
       
   119     public void runFlushActions() {
       
   120         Runnable[] flushActions = this.flushActions;
       
   121         for (int i = 0; i < flushActions.length; i++) {
       
   122             try {
       
   123                 flushActions[i].run();
       
   124             } catch (Exception e) {
       
   125                 handleError(e);
       
   126             }
       
   127         }
       
   128     }
       
   129 
       
   130     public void runCloseActions() {
       
   131         Runnable[] closeActions = this.closeActions;
       
   132         for (int i = 0; i < closeActions.length; i++) {
       
   133             try {
       
   134                 closeActions[i].run();
       
   135             } catch (Exception e) {
       
   136                 handleError(e);
       
   137             }
       
   138         }
       
   139     }
       
   140 
       
   141     void defaultErrorHandler(Throwable e) {
       
   142         e.printStackTrace();
       
   143     }
       
   144 }