src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/Dispatcher.java
author egahlin
Mon, 16 Sep 2019 09:57:26 +0200
branchJEP-349-branch
changeset 58146 9f3aadcaa430
parent 58145 bc54ed8d908a
child 58153 0f7562601338
permissions -rw-r--r--
Rename InternalEventFulter to ParserFilter
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
     1
package jdk.jfr.internal.consumer;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     2
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     3
import java.time.Instant;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     4
import java.util.ArrayList;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     5
import java.util.List;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     6
import java.util.function.Consumer;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     7
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     8
import jdk.jfr.EventType;
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
     9
import jdk.jfr.consumer.RecordedEvent;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    10
import jdk.jfr.internal.LongMap;
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
    11
import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    12
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
    13
final class Dispatcher {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    14
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    15
    public final static class EventDispatcher {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    16
        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    17
        final String eventName;
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
    18
        public final Consumer<RecordedEvent> action;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    19
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    20
        public EventDispatcher(Consumer<RecordedEvent> action) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    21
            this(null, action);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    22
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    23
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    24
        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    25
            this.eventName = eventName;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    26
            this.action = action;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    27
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    28
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    29
        public void offer(RecordedEvent event) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    30
            action.accept(event);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    31
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    32
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    33
        public boolean accepts(EventType eventType) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    34
            return (eventName == null || eventType.getName().equals(eventName));
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    35
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    36
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    37
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    38
    final Consumer<Throwable>[] errorActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    39
    final Runnable[] flushActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    40
    final Runnable[] closeActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    41
    final EventDispatcher[] dispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    42
    final LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    43
    final ParserConfiguration parserConfiguration;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    44
    final Instant startTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    45
    final Instant endTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    46
    final long startNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    47
    final long endNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    48
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    49
    // Cache
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    50
    private EventType cacheEventType;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    51
    private EventDispatcher[] cacheDispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    52
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    53
    @SuppressWarnings({"unchecked","rawtypes"})
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    54
    public Dispatcher(StreamConfiguration c) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    55
        this.flushActions = c.flushActions.toArray(new Runnable[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    56
        this.closeActions = c.closeActions.toArray(new Runnable[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    57
        this.errorActions = c.errorActions.toArray(new Consumer[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    58
        this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    59
        this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    60
        this.startTime = c.startTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    61
        this.endTime = c.endTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    62
        this.startNanos = c.startNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    63
        this.endNanos = c.endNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    64
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    65
58146
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    66
    private static ParserFilter buildFilter(EventDispatcher[] dispatchers) {
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    67
        ParserFilter ef = new ParserFilter();
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    68
        for (EventDispatcher ed : dispatchers) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    69
            String name = ed.eventName;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    70
            if (name == null) {
58146
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    71
                return ParserFilter.ACCEPT_ALL;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    72
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    73
            ef.setThreshold(name, 0);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    74
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    75
        return ef;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    76
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    77
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    78
    protected final void dispatch(RecordedEvent event) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    79
        EventType type = event.getEventType();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    80
        EventDispatcher[] dispatchers = null;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    81
        if (type == cacheEventType) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    82
            dispatchers = cacheDispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    83
        } else {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    84
            dispatchers = dispatcherLookup.get(type.getId());
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    85
            if (dispatchers == null) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    86
                List<EventDispatcher> list = new ArrayList<>();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    87
                for (EventDispatcher e : this.dispatchers) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    88
                    if (e.accepts(type)) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    89
                        list.add(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    90
                    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    91
                }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    92
                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    93
                dispatcherLookup.put(type.getId(), dispatchers);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    94
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    95
            cacheDispatchers = dispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    96
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    97
        for (int i = 0; i < dispatchers.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    98
            try {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    99
                dispatchers[i].offer(event);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   100
            } catch (Exception e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   101
                handleError(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   102
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   103
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   104
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   105
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   106
    public void handleError(Throwable e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   107
        Consumer<?>[] consumers = this.errorActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   108
        if (consumers.length == 0) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   109
            defaultErrorHandler(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   110
            return;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   111
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   112
        for (int i = 0; i < consumers.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   113
            @SuppressWarnings("unchecked")
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   114
            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   115
            conusmer.accept(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   116
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   117
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   118
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   119
    public void runFlushActions() {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   120
        Runnable[] flushActions = this.flushActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   121
        for (int i = 0; i < flushActions.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   122
            try {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   123
                flushActions[i].run();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   124
            } catch (Exception e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   125
                handleError(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   126
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   127
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   128
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   130
    public void runCloseActions() {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   131
        Runnable[] closeActions = this.closeActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   132
        for (int i = 0; i < closeActions.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   133
            try {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   134
                closeActions[i].run();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   135
            } catch (Exception e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   136
                handleError(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   137
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   138
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   139
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   140
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   141
    void defaultErrorHandler(Throwable e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   142
        e.printStackTrace();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   143
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   144
}