src/jdk.jfr/share/classes/jdk/jfr/consumer/EventRunner.java
branchJEP-349-branch
changeset 57361 53dccc90a5be
equal deleted inserted replaced
57360:5d043a159d5c 57361:53dccc90a5be
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.jfr.consumer;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.invoke.MethodHandles;
       
    30 import java.lang.invoke.VarHandle;
       
    31 import java.security.AccessControlContext;
       
    32 import java.security.AccessController;
       
    33 import java.security.PrivilegedAction;
       
    34 import java.util.ArrayList;
       
    35 import java.util.Arrays;
       
    36 import java.util.List;
       
    37 
       
    38 import jdk.jfr.consumer.EventDirectoryStream.EventConsumer;
       
    39 import jdk.jfr.internal.JVM;
       
    40 import jdk.jfr.internal.LogLevel;
       
    41 import jdk.jfr.internal.LogTag;
       
    42 import jdk.jfr.internal.Logger;
       
    43 
       
    44 class EventRunner implements Runnable {
       
    45     private final static VarHandle closedHandle;
       
    46     private final static VarHandle consumersHandle;
       
    47     private final static VarHandle dispatcherHandle;
       
    48     private final static VarHandle flushActionsHandle;
       
    49     private final static VarHandle closeActionsHandle;
       
    50     static {
       
    51         try {
       
    52             MethodHandles.Lookup l = MethodHandles.lookup();
       
    53             closedHandle = l.findVarHandle(EventRunner.class, "closed", boolean.class);
       
    54             consumersHandle = l.findVarHandle(EventRunner.class, "consumers", EventConsumer[].class);
       
    55             dispatcherHandle = l.findVarHandle(EventRunner.class, "dispatcher", LongMap.class);
       
    56             flushActionsHandle = l.findVarHandle(EventRunner.class, "flushActions", Runnable[].class);
       
    57             closeActionsHandle = l.findVarHandle(EventRunner.class, "closeActions", Runnable[].class);
       
    58         } catch (ReflectiveOperationException e) {
       
    59             throw new InternalError(e);
       
    60         }
       
    61     }
       
    62     // set by VarHandle
       
    63     private boolean closed;
       
    64     // set by VarHandle
       
    65     private EventConsumer[] consumers = new EventConsumer[0];
       
    66     // set by VarHandle
       
    67     private LongMap<EventConsumer[]> dispatcher = new LongMap<>();
       
    68     // set by VarHandle
       
    69     private Runnable[] flushActions = new Runnable[0];
       
    70     // set by VarHandle
       
    71     private Runnable[] closeActions = new Runnable[0];
       
    72 
       
    73     private final static JVM jvm = JVM.getJVM();
       
    74     private final static EventConsumer[] NO_CONSUMERS = new EventConsumer[0];
       
    75     private final AccessControlContext accessControlContext;
       
    76     private EventSetLocation location;
       
    77     private EventSet eventSet;
       
    78     private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
    79     private int eventSetIndex;
       
    80     private int eventArrayIndex;
       
    81     private RecordedEvent[] currentEventArray = new RecordedEvent[0];
       
    82     private volatile long startNanos;
       
    83 
       
    84     public EventRunner(AccessControlContext acc) throws IOException {
       
    85         this.accessControlContext = acc;
       
    86     }
       
    87 
       
    88     public void run() {
       
    89         doPriviliged(() -> execute());
       
    90     }
       
    91 
       
    92     void doPriviliged(Runnable r) {
       
    93         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
    94             @Override
       
    95             public Void run() {
       
    96                 r.run();
       
    97                 return null;
       
    98             }
       
    99         }, accessControlContext);
       
   100     }
       
   101 
       
   102     private void execute() {
       
   103         jvm.exclude(Thread.currentThread());
       
   104         try {
       
   105             process();
       
   106         } catch (Throwable e) {
       
   107             e.printStackTrace();
       
   108             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
       
   109         } finally {
       
   110             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
       
   111         }
       
   112     }
       
   113 
       
   114     private void process() throws Exception, IOException {
       
   115         this.location = EventSetLocation.current();
       
   116         this.eventSet = location.acquire(startNanos, null); // use timestamp from
       
   117         if (eventSet == null) {
       
   118             return;
       
   119         }
       
   120         while (!closed) {
       
   121             processSegment();
       
   122             Runnable[] fas = this.flushActions;
       
   123             for (int i = 0; i < fas.length; i++) {
       
   124                 fas[i].run();
       
   125             }
       
   126             do {
       
   127                 if (closed) {
       
   128                     return;
       
   129                 }
       
   130                 currentEventArray = eventSet.readEvents(eventSetIndex);
       
   131                 if (currentEventArray == EventSet.END_OF_SET) {
       
   132                     eventSet = eventSet.next(eventFilter);
       
   133                     if (eventSet == null || closed) {
       
   134                         return;
       
   135                     }
       
   136                     eventSetIndex = 0;
       
   137                     continue;
       
   138                 }
       
   139                 if (currentEventArray == null) {
       
   140                     return; // no more events
       
   141                 }
       
   142                 eventSetIndex++;
       
   143             } while (currentEventArray.length == 0);
       
   144             eventArrayIndex = 0;
       
   145         }
       
   146     }
       
   147 
       
   148     private void processSegment() {
       
   149         while (eventArrayIndex < currentEventArray.length) {
       
   150             RecordedEvent e = currentEventArray[eventArrayIndex++];
       
   151             if (e == null) {
       
   152                return;
       
   153             }
       
   154             EventConsumer[] consumerDispatch = dispatcher.get(e.getEventType().getId());
       
   155             if (consumerDispatch == null) {
       
   156                 consumerDispatch = NO_CONSUMERS;
       
   157                 for (EventConsumer ec : consumers.clone()) {
       
   158                     if (ec.accepts(e.getEventType())) {
       
   159                         consumerDispatch = merge(consumerDispatch, ec);
       
   160                     }
       
   161                 }
       
   162                 dispatcher.put(e.getEventType().getId(), consumerDispatch);
       
   163             }
       
   164             for (int i = 0; i < consumerDispatch.length; i++) {
       
   165                 consumerDispatch[i].offer(e);
       
   166             }
       
   167         }
       
   168     }
       
   169 
       
   170     static EventConsumer[] merge(EventConsumer[] current, EventConsumer add) {
       
   171         EventConsumer[] array = new EventConsumer[current.length + 1];
       
   172         System.arraycopy(current, 0, array, 0, current.length);
       
   173         array[current.length] = add;
       
   174         return array;
       
   175     }
       
   176 
       
   177     public void add(EventConsumer e) {
       
   178         consumersHandle.setVolatile(this, merge(consumers, e));
       
   179         dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
       
   180     }
       
   181 
       
   182     private static Runnable[] removeAction(Runnable[] array, Object action)  {
       
   183         if (array.length == 0) {
       
   184             return null;
       
   185         }
       
   186         boolean remove = false;
       
   187         List<Runnable> list = new ArrayList<>();
       
   188         for (int i = 0; i < array.length; i++)  {
       
   189             if (array[i] != action) {
       
   190                 list.add(array[i]);
       
   191             } else {
       
   192                 remove = true;
       
   193             }
       
   194         }
       
   195         if (remove) {
       
   196             return list.toArray(new Runnable[list.size()]);
       
   197         }
       
   198         return null;
       
   199     }
       
   200 
       
   201     private static Runnable[] addAction(Runnable[] array, Runnable action)   {
       
   202         ArrayList<Runnable> a = new ArrayList<>();
       
   203         a.addAll(Arrays.asList(array));
       
   204         a.add(action);
       
   205         return a.toArray(new Runnable[0]);
       
   206     }
       
   207 
       
   208     public boolean remove(Object action) {
       
   209         boolean remove = false;
       
   210         Runnable[] updatedFlushActions = removeAction(flushActions, action);
       
   211         if (updatedFlushActions != null) {
       
   212             flushActionsHandle.setVolatile(this, updatedFlushActions);
       
   213             remove = true;
       
   214         }
       
   215         Runnable[] updatedCloseActions = removeAction(closeActions, action);
       
   216         if (updatedCloseActions != null) {
       
   217             closeActionsHandle.setVolatile(this, updatedCloseActions);
       
   218             remove = true;
       
   219         }
       
   220 
       
   221         boolean removeConsumer = false;
       
   222         List<EventConsumer> list = new ArrayList<>();
       
   223         for (int i = 0; i < consumers.length; i++) {
       
   224             if (consumers[i].action != action) {
       
   225                 list.add(consumers[i]);
       
   226             } else {
       
   227                 removeConsumer = true;
       
   228                 remove = true;
       
   229             }
       
   230         }
       
   231         if (removeConsumer) {
       
   232             EventConsumer[] array = list.toArray(new EventConsumer[list.size()]);
       
   233             consumersHandle.setVolatile(this, array);
       
   234             dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset dispatch
       
   235         }
       
   236         return remove;
       
   237     }
       
   238 
       
   239     public void addFlush(Runnable action) {
       
   240         flushActionsHandle.setVolatile(this, addAction(flushActions, action));
       
   241     }
       
   242 
       
   243     public void close() {
       
   244         closedHandle.setVolatile(this, true);
       
   245         // TODO: Data races here, must fix
       
   246         if (eventSet != null) {
       
   247             eventSet.release(null);
       
   248         }
       
   249         if (location != null) {
       
   250             location.release();
       
   251         }
       
   252 
       
   253         Runnable[] cas = this.closeActions;
       
   254         for (int i = 0; i < cas.length; i++) {
       
   255             cas[i].run();
       
   256         }
       
   257     }
       
   258 
       
   259     public void addCloseAction(Runnable action) {
       
   260         closeActionsHandle.setVolatile(this, addAction(closeActions, action));
       
   261     }
       
   262 
       
   263     public void setStartNanos(long startNanos) {
       
   264         this.startNanos = startNanos;
       
   265     }
       
   266 }