src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/Dispatcher.java
author egahlin
Wed, 18 Sep 2019 03:45:46 +0200
branchJEP-349-branch
changeset 58197 0ef79bd7fb5c
parent 58153 0f7562601338
child 58200 2d147d680311
permissions -rw-r--r--
Remove unused methods etc.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
     1
package jdk.jfr.internal.consumer;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     2
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     3
import java.time.Instant;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     4
import java.util.ArrayList;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     5
import java.util.List;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     6
import java.util.function.Consumer;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     7
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
     8
import jdk.jfr.EventType;
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
     9
import jdk.jfr.consumer.RecordedEvent;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    10
import jdk.jfr.internal.LongMap;
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
    11
import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    12
58145
bc54ed8d908a Move implementation into jdk.jfr.internal.consumer
egahlin
parents: 58129
diff changeset
    13
final class Dispatcher {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    14
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    15
    final static class EventDispatcher {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    16
        private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    17
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    18
        private final String eventName;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    19
        private final Consumer<RecordedEvent> action;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    20
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    21
        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    22
            this.eventName = eventName;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    23
            this.action = action;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    24
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    25
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    26
        private void offer(RecordedEvent event) {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    27
            action.accept(event);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    28
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    29
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    30
        private boolean accepts(EventType eventType) {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    31
            return (eventName == null || eventType.getName().equals(eventName));
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    32
        }
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    33
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    34
        public Consumer<RecordedEvent> getAction() {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    35
            return action;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    36
        }
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    37
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    38
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    39
    private final Consumer<Throwable>[] errorActions;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    40
    private final Runnable[] flushActions;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    41
    private final Runnable[] closeActions;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    42
    private final EventDispatcher[] dispatchers;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    43
    private final LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    44
    final ParserConfiguration parserConfiguration;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    45
    final Instant startTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    46
    final Instant endTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    47
    final long startNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    48
    final long endNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    49
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    50
    // Cache
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    51
    private EventType cacheEventType;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    52
    private EventDispatcher[] cacheDispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    53
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    54
    @SuppressWarnings({"unchecked","rawtypes"})
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    55
    public Dispatcher(StreamConfiguration c) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    56
        this.flushActions = c.flushActions.toArray(new Runnable[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    57
        this.closeActions = c.closeActions.toArray(new Runnable[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    58
        this.errorActions = c.errorActions.toArray(new Consumer[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    59
        this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    60
        this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    61
        this.startTime = c.startTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    62
        this.endTime = c.endTime;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    63
        this.startNanos = c.startNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    64
        this.endNanos = c.endNanos;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    65
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    66
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    67
    public void runFlushActions() {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    68
        Runnable[] flushActions = this.flushActions;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    69
        for (int i = 0; i < flushActions.length; i++) {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    70
            try {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    71
                flushActions[i].run();
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    72
            } catch (Exception e) {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    73
                handleError(e);
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    74
            }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    75
        }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    76
    }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    77
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    78
    public void runCloseActions() {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    79
        Runnable[] closeActions = this.closeActions;
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    80
        for (int i = 0; i < closeActions.length; i++) {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    81
            try {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    82
                closeActions[i].run();
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    83
            } catch (Exception e) {
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    84
                handleError(e);
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    85
            }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    86
        }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    87
    }
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
    88
58146
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    89
    private static ParserFilter buildFilter(EventDispatcher[] dispatchers) {
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    90
        ParserFilter ef = new ParserFilter();
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    91
        for (EventDispatcher ed : dispatchers) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    92
            String name = ed.eventName;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    93
            if (name == null) {
58146
9f3aadcaa430 Rename InternalEventFulter to ParserFilter
egahlin
parents: 58145
diff changeset
    94
                return ParserFilter.ACCEPT_ALL;
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    95
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    96
            ef.setThreshold(name, 0);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    97
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    98
        return ef;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
    99
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   100
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
   101
    void dispatch(RecordedEvent event) {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   102
        EventType type = event.getEventType();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   103
        EventDispatcher[] dispatchers = null;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   104
        if (type == cacheEventType) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   105
            dispatchers = cacheDispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   106
        } else {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   107
            dispatchers = dispatcherLookup.get(type.getId());
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   108
            if (dispatchers == null) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   109
                List<EventDispatcher> list = new ArrayList<>();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   110
                for (EventDispatcher e : this.dispatchers) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   111
                    if (e.accepts(type)) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   112
                        list.add(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   113
                    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   114
                }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   115
                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   116
                dispatcherLookup.put(type.getId(), dispatchers);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   117
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   118
            cacheDispatchers = dispatchers;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   119
        }
58153
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   120
        // Expected behavior if exception occurs in onEvent:
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   121
        //
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   122
        // Synchronous:
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   123
        //  - User has added onError action:
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   124
        //     Catch exception, call onError and continue with next event
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   125
        //     Let Errors propagate to caller of EventStream::start
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   126
        //  - Default action
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   127
        //     Catch exception, e.printStackTrace() and continue with next event
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   128
        //     Let Errors propagate to caller of EventStream::start
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   129
        //
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   130
        // Asynchronous
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   131
        //  - User has added onError action
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   132
        //     Catch exception, call onError and continue with next event
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   133
        //     Let Errors propagate, shutdown thread and stream
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   134
        //  - Default action
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   135
        //    Catch exception, e.printStackTrace() and continue with next event
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   136
        //    Let Errors propagate and shutdown thread and stream
0f7562601338 Describe and implement exception handling policy
egahlin
parents: 58146
diff changeset
   137
        //
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   138
        for (int i = 0; i < dispatchers.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   139
            try {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   140
                dispatchers[i].offer(event);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   141
            } catch (Exception e) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   142
                handleError(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   143
            }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   144
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   145
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   146
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
   147
    private void handleError(Throwable e) {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   148
        Consumer<?>[] consumers = this.errorActions;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   149
        if (consumers.length == 0) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   150
            defaultErrorHandler(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   151
            return;
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   152
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   153
        for (int i = 0; i < consumers.length; i++) {
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   154
            @SuppressWarnings("unchecked")
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   155
            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   156
            conusmer.accept(e);
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   157
        }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   158
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   159
58197
0ef79bd7fb5c Remove unused methods etc.
egahlin
parents: 58153
diff changeset
   160
    private void defaultErrorHandler(Throwable e) {
58129
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   161
        e.printStackTrace();
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   162
    }
7b751fe181a5 Restructure stream configuration
egahlin
parents:
diff changeset
   163
}