src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
branchJEP-349-branch
changeset 58020 f082177c5023
parent 57985 be121cbf3284
child 58129 7b751fe181a5
equal deleted inserted replaced
57997:8dea18a54031 58020:f082177c5023
    31 import java.security.AccessController;
    31 import java.security.AccessController;
    32 import java.security.PrivilegedAction;
    32 import java.security.PrivilegedAction;
    33 import java.time.Duration;
    33 import java.time.Duration;
    34 import java.time.Instant;
    34 import java.time.Instant;
    35 import java.util.ArrayList;
    35 import java.util.ArrayList;
    36 import java.util.Arrays;
       
    37 import java.util.Comparator;
    36 import java.util.Comparator;
    38 import java.util.List;
    37 import java.util.List;
    39 import java.util.Objects;
    38 import java.util.Objects;
       
    39 import java.util.concurrent.atomic.AtomicLong;
    40 import java.util.function.Consumer;
    40 import java.util.function.Consumer;
    41 
    41 
    42 import jdk.jfr.EventType;
    42 import jdk.jfr.EventType;
    43 import jdk.jfr.internal.JVM;
    43 import jdk.jfr.internal.JVM;
    44 import jdk.jfr.internal.LogLevel;
    44 import jdk.jfr.internal.LogLevel;
    45 import jdk.jfr.internal.LogTag;
    45 import jdk.jfr.internal.LogTag;
    46 import jdk.jfr.internal.Logger;
    46 import jdk.jfr.internal.Logger;
    47 import jdk.jfr.internal.LongMap;
       
    48 import jdk.jfr.internal.SecuritySupport;
    47 import jdk.jfr.internal.SecuritySupport;
    49 import jdk.jfr.internal.Utils;
       
    50 import jdk.jfr.internal.consumer.InternalEventFilter;
       
    51 
    48 
    52 /*
    49 /*
    53  * Purpose of this class is to simplify the implementation of
    50  * Purpose of this class is to simplify the implementation of
    54  * an event stream. In particular, it handles:
    51  * an event stream. In particular, it handles:
    55  *
    52  *
    60  * - security
    57  * - security
    61  *
    58  *
    62  */
    59  */
    63 abstract class AbstractEventStream implements EventStream {
    60 abstract class AbstractEventStream implements EventStream {
    64 
    61 
    65     protected static final class StreamConfiguration {
    62     final static class EventDispatcher {
    66         private static final Runnable[] NO_ACTIONS = new Runnable[0];
       
    67 
       
    68         private Consumer<?>[] errorActions = new Consumer<?>[0];
       
    69         private Runnable[] flushActions = NO_ACTIONS;
       
    70         private Runnable[] closeActions = NO_ACTIONS;
       
    71         private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
       
    72         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
    73         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
       
    74 
       
    75         private boolean changedConfiguration = false;
       
    76         private boolean closed = false;
       
    77         private boolean reuse = true;
       
    78         private boolean ordered = true;
       
    79         private Instant startTime = null;
       
    80         private Instant endTime = null;
       
    81         private boolean started = false;
       
    82         private long startNanos = 0;
       
    83         private long endNanos = Long.MAX_VALUE;
       
    84 
       
    85         public StreamConfiguration(StreamConfiguration configuration) {
       
    86             this.flushActions = configuration.flushActions;
       
    87             this.closeActions = configuration.closeActions;
       
    88             this.errorActions = configuration.errorActions;
       
    89             this.dispatchers = configuration.dispatchers;
       
    90             this.eventFilter = configuration.eventFilter;
       
    91             this.closed = configuration.closed;
       
    92             this.reuse = configuration.reuse;
       
    93             this.ordered = configuration.ordered;
       
    94             this.startTime = configuration.startTime;
       
    95             this.endTime = configuration.endTime;
       
    96             this.started = configuration.started;
       
    97             this.startNanos = configuration.startNanos;
       
    98             this.endNanos = configuration.endNanos;
       
    99             this.dispatcherLookup = configuration.dispatcherLookup;
       
   100         }
       
   101 
       
   102         public StreamConfiguration() {
       
   103         }
       
   104 
       
   105         public StreamConfiguration remove(Object action) {
       
   106             flushActions = remove(flushActions, action);
       
   107             closeActions = remove(closeActions, action);
       
   108             dispatchers = removeDispatch(dispatchers, action);
       
   109             return this;
       
   110         }
       
   111 
       
   112         public StreamConfiguration addDispatcher(EventDispatcher e) {
       
   113             dispatchers = add(dispatchers, e);
       
   114             eventFilter = buildFilter(dispatchers);
       
   115             dispatcherLookup = new LongMap<>();
       
   116             return this;
       
   117         }
       
   118 
       
   119         public StreamConfiguration addFlushAction(Runnable action) {
       
   120             flushActions = add(flushActions, action);
       
   121             return this;
       
   122         }
       
   123 
       
   124         public StreamConfiguration addCloseAction(Runnable action) {
       
   125             closeActions = add(closeActions, action);
       
   126             return this;
       
   127         }
       
   128 
       
   129         public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
       
   130             errorActions = add(errorActions, action);
       
   131             return this;
       
   132         }
       
   133 
       
   134         public StreamConfiguration setClosed(boolean closed) {
       
   135             this.closed = closed;
       
   136             changedConfiguration = true;
       
   137             return this;
       
   138         }
       
   139 
       
   140         public boolean isClosed() {
       
   141             return closed;
       
   142         }
       
   143 
       
   144         public Runnable[] getCloseActions() {
       
   145             return closeActions;
       
   146         }
       
   147 
       
   148         public Runnable[] getFlushActions() {
       
   149             return flushActions;
       
   150         }
       
   151 
       
   152         private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
       
   153             List<EventDispatcher> list = new ArrayList<>(array.length);
       
   154             boolean modified = false;
       
   155             for (int i = 0; i < array.length; i++) {
       
   156                 if (array[i].action != action) {
       
   157                     list.add(array[i]);
       
   158                 } else {
       
   159                     modified = true;
       
   160                 }
       
   161             }
       
   162             EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
       
   163             if (modified) {
       
   164                 eventFilter = buildFilter(result);
       
   165                 dispatcherLookup = new LongMap<>();
       
   166                 changedConfiguration = true;
       
   167             }
       
   168             return result;
       
   169         }
       
   170 
       
   171         private <T> T[] remove(T[] array, Object action) {
       
   172             List<T> list = new ArrayList<>(array.length);
       
   173             for (int i = 0; i < array.length; i++) {
       
   174                 if (array[i] != action) {
       
   175                     list.add(array[i]);
       
   176                 } else {
       
   177                     changedConfiguration = true;
       
   178                 }
       
   179             }
       
   180             return list.toArray(array);
       
   181         }
       
   182 
       
   183         private <T> T[] add(T[] array, T object) {
       
   184             List<T> list = new ArrayList<>(Arrays.asList(array));
       
   185             list.add(object);
       
   186             changedConfiguration = true;
       
   187             return list.toArray(array);
       
   188         }
       
   189 
       
   190         private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
       
   191             InternalEventFilter ef = new InternalEventFilter();
       
   192             for (EventDispatcher ed : dispatchers) {
       
   193                 String name = ed.eventName;
       
   194                 if (name == null) {
       
   195                     return InternalEventFilter.ACCEPT_ALL;
       
   196                 }
       
   197                 ef.setThreshold(name, 0);
       
   198             }
       
   199             return ef;
       
   200         }
       
   201 
       
   202         public StreamConfiguration setReuse(boolean reuse) {
       
   203             this.reuse = reuse;
       
   204             changedConfiguration = true;
       
   205             return this;
       
   206         }
       
   207 
       
   208         public StreamConfiguration setOrdered(boolean ordered) {
       
   209             this.ordered = ordered;
       
   210             changedConfiguration = true;
       
   211             return this;
       
   212         }
       
   213 
       
   214         public StreamConfiguration setEndTime(Instant endTime) {
       
   215             this.endTime = endTime;
       
   216             this.endNanos = Utils.timeToNanos(endTime);
       
   217             changedConfiguration = true;
       
   218             return this;
       
   219         }
       
   220 
       
   221         public StreamConfiguration setStartTime(Instant startTime) {
       
   222             this.startTime = startTime;
       
   223             this.startNanos = Utils.timeToNanos(startTime);
       
   224             changedConfiguration = true;
       
   225             return this;
       
   226         }
       
   227 
       
   228         public Instant getStartTime() {
       
   229             return startTime;
       
   230         }
       
   231 
       
   232         public Object getEndTime() {
       
   233             return endTime;
       
   234         }
       
   235 
       
   236         public boolean isStarted() {
       
   237             return started;
       
   238         }
       
   239 
       
   240         public StreamConfiguration setStartNanos(long startNanos) {
       
   241             this.startNanos = startNanos;
       
   242             changedConfiguration = true;
       
   243             return this;
       
   244         }
       
   245 
       
   246         public void setStarted(boolean started) {
       
   247             this.started = started;
       
   248             changedConfiguration = true;
       
   249         }
       
   250 
       
   251         public boolean hasChanged() {
       
   252             return changedConfiguration;
       
   253         }
       
   254 
       
   255         public boolean getReuse() {
       
   256             return reuse;
       
   257         }
       
   258 
       
   259         public boolean getOrdered() {
       
   260             return ordered;
       
   261         }
       
   262 
       
   263         public InternalEventFilter getFiler() {
       
   264             return eventFilter;
       
   265         }
       
   266 
       
   267         public long getStartNanos() {
       
   268             return startNanos;
       
   269         }
       
   270 
       
   271         public long getEndNanos() {
       
   272             return endNanos;
       
   273         }
       
   274 
       
   275         public InternalEventFilter getFilter() {
       
   276             return eventFilter;
       
   277         }
       
   278 
       
   279         public String toString() {
       
   280             StringBuilder sb = new StringBuilder();
       
   281             for (Runnable flush : flushActions) {
       
   282                 sb.append("Flush Action: ").append(flush).append("\n");
       
   283             }
       
   284             for (Runnable close : closeActions) {
       
   285                 sb.append("Close Action: " + close + "\n");
       
   286             }
       
   287             for (Consumer<?> error : errorActions) {
       
   288                 sb.append("Error Action: " + error + "\n");
       
   289             }
       
   290             for (EventDispatcher dispatcher : dispatchers) {
       
   291                 sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
       
   292             }
       
   293             sb.append("Closed: ").append(closed).append("\n");
       
   294             sb.append("Reuse: ").append(reuse).append("\n");
       
   295             sb.append("Ordered: ").append(ordered).append("\n");
       
   296             sb.append("Started: ").append(started).append("\n");
       
   297             sb.append("Start Time: ").append(startTime).append("\n");
       
   298             sb.append("Start Nanos: ").append(startNanos).append("\n");
       
   299             sb.append("End Time: ").append(endTime).append("\n");
       
   300             sb.append("End Nanos: ").append(endNanos).append("\n");
       
   301             return sb.toString();
       
   302         }
       
   303 
       
   304         private EventDispatcher[] getDispatchers() {
       
   305             return dispatchers;
       
   306         }
       
   307     }
       
   308 
       
   309     private final static class EventDispatcher {
       
   310         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
    63         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
   311         final private String eventName;
    64         final String eventName;
   312         final private Consumer<RecordedEvent> action;
    65         final Consumer<RecordedEvent> action;
   313 
    66 
   314         public EventDispatcher(Consumer<RecordedEvent> action) {
    67         public EventDispatcher(Consumer<RecordedEvent> action) {
   315             this(null, action);
    68             this(null, action);
   316         }
    69         }
   317 
    70 
   328             return (eventName == null || eventType.getName().equals(eventName));
    81             return (eventName == null || eventType.getName().equals(eventName));
   329         }
    82         }
   330     }
    83     }
   331 
    84 
   332     final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
    85     final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
   333 
    86     private final static AtomicLong counter = new AtomicLong(1);
   334     private final Thread thread;
    87     private volatile Thread thread;
       
    88     private final Object terminated = new Object();
   335     private final boolean active;
    89     private final boolean active;
   336     private final Runnable flushOperation = () -> runFlushActions();
    90     private final Runnable flushOperation = () -> runFlushActions();
   337     private final AccessControlContext accessControllerContext;
    91     private final AccessControlContext accessControllerContext;
   338     private final Object configurationLock = new Object();
    92     private final Object configurationLock = new Object();
   339 
    93 
   340     // Modified by updateConfiguration()
    94     // Modified by updateConfiguration()
   341     protected volatile StreamConfiguration configuration = new StreamConfiguration();
    95     protected volatile StreamConfiguration configuration = new StreamConfiguration();
   342 
    96 
   343     // Cache the last event type and dispatch.
       
   344     private EventType lastEventType;
       
   345     private EventDispatcher[] lastEventDispatch;
       
   346 
       
   347     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
    97     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
   348         this.accessControllerContext = Objects.requireNonNull(acc);
    98         this.accessControllerContext = Objects.requireNonNull(acc);
   349         this.active = active;
    99         this.active = active;
   350         this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
       
   351     }
   100     }
   352 
   101 
   353     @Override
   102     @Override
   354     abstract public void start();
   103     abstract public void start();
   355 
   104 
   358 
   107 
   359     @Override
   108     @Override
   360     abstract public void close();
   109     abstract public void close();
   361 
   110 
   362     // Purpose of synchronizing the following methods is
   111     // Purpose of synchronizing the following methods is
   363     // to serialize changes to the configuration, so only one
   112     // to serialize changes to the configuration so only one
   364     // thread at a time can change the configuration.
   113     // thread at a time can change the configuration.
   365     //
   114     //
   366     // The purpose is not to guard the configuration field. A new
   115     // The purpose is not to guard the configuration field. A new
   367     // configuration is published using updateConfiguration
   116     // configuration is published using updateConfiguration
   368     //
   117     //
   453             return updateConfiguration(new StreamConfiguration(configuration).remove(action));
   202             return updateConfiguration(new StreamConfiguration(configuration).remove(action));
   454         }
   203         }
   455     }
   204     }
   456 
   205 
   457     @Override
   206     @Override
   458     public final void awaitTermination() {
   207     public final void awaitTermination() throws InterruptedException {
   459         awaitTermination(Duration.ofMillis(0));
   208         awaitTermination(Duration.ofMillis(0));
   460     }
   209     }
   461 
   210 
   462     @Override
   211     @Override
   463     public final void awaitTermination(Duration timeout) {
   212     public final void awaitTermination(Duration timeout) throws InterruptedException {
   464         Objects.requireNonNull(timeout);
   213         Objects.requireNonNull(timeout);
   465         if (thread != Thread.currentThread()) {
   214         if (timeout.isNegative()) {
   466             try {
   215             throw new IllegalArgumentException("timeout value is negative");
   467                 thread.join(timeout.toMillis());
   216         }
   468             } catch (InterruptedException e) {
   217 
   469                 // ignore
   218         long base = System.currentTimeMillis();
       
   219         long now = 0;
       
   220 
       
   221         long millis;
       
   222         try {
       
   223             millis = Math.multiplyExact(timeout.getSeconds(), 1000);
       
   224         } catch (ArithmeticException a) {
       
   225             millis = Long.MAX_VALUE;
       
   226         }
       
   227         int nanos = timeout.toNanosPart();
       
   228         if (nanos == 0 && millis == 0) {
       
   229             synchronized (terminated) {
       
   230                 while (!isClosed()) {
       
   231                     terminated.wait(0);
       
   232                 }
       
   233             }
       
   234         } else {
       
   235             while (!isClosed()) {
       
   236                 long delay = millis - now;
       
   237                 if (delay <= 0) {
       
   238                     break;
       
   239                 }
       
   240                 synchronized (terminated) {
       
   241                     terminated.wait(delay, nanos);
       
   242                 }
       
   243                 now = System.currentTimeMillis() - base;
   470             }
   244             }
   471         }
   245         }
   472     }
   246     }
   473 
   247 
   474     protected abstract void process() throws Exception;
   248     protected abstract void process() throws Exception;
   475 
   249 
   476     protected final void clearLastDispatch() {
   250     protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
   477         lastEventDispatch = null;
       
   478         lastEventType = null;
       
   479     }
       
   480 
       
   481     protected final void dispatch(RecordedEvent event) {
       
   482         EventType type = event.getEventType();
   251         EventType type = event.getEventType();
   483         EventDispatcher[] dispatchers = null;
   252         EventDispatcher[] dispatchers = null;
   484         if (type == lastEventType) {
   253         if (type == c.cacheEventType) {
   485             dispatchers = lastEventDispatch;
   254             dispatchers = c.cacheDispatchers;
   486         } else {
   255         } else {
   487             dispatchers = configuration.dispatcherLookup.get(type.getId());
   256             dispatchers = c.dispatcherLookup.get(type.getId());
   488             if (dispatchers == null) {
   257             if (dispatchers == null) {
   489                 List<EventDispatcher> list = new ArrayList<>();
   258                 List<EventDispatcher> list = new ArrayList<>();
   490                 for (EventDispatcher e : configuration.getDispatchers()) {
   259                 for (EventDispatcher e : c.getDispatchers()) {
   491                     if (e.accepts(type)) {
   260                     if (e.accepts(type)) {
   492                         list.add(e);
   261                         list.add(e);
   493                     }
   262                     }
   494                 }
   263                 }
   495                 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
   264                 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
   496                 configuration.dispatcherLookup.put(type.getId(), dispatchers);
   265                 c.dispatcherLookup.put(type.getId(), dispatchers);
   497             }
   266             }
   498             lastEventDispatch = dispatchers;
   267             c.cacheDispatchers = dispatchers;
   499         }
   268         }
   500         for (int i = 0; i < dispatchers.length; i++) {
   269         for (int i = 0; i < dispatchers.length; i++) {
   501             try {
   270             try {
   502                 dispatchers[i].offer(event);
   271                 dispatchers[i].offer(event);
   503             } catch (Exception e) {
   272             } catch (Exception e) {
   527         return configuration.isClosed();
   296         return configuration.isClosed();
   528     }
   297     }
   529 
   298 
   530     protected final void startAsync(long startNanos) {
   299     protected final void startAsync(long startNanos) {
   531         startInternal(startNanos);
   300         startInternal(startNanos);
       
   301         Runnable r = () -> run(accessControllerContext);
       
   302         thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
   532         thread.start();
   303         thread.start();
   533     }
   304     }
   534 
   305 
   535     protected final void start(long startNanos) {
   306     protected final void start(long startNanos) {
   536         startInternal(startNanos);
   307         startInternal(startNanos);
       
   308         thread = Thread.currentThread();
   537         run(accessControllerContext);
   309         run(accessControllerContext);
   538     }
   310     }
   539 
   311 
   540     protected final Runnable getFlushOperation() {
   312     protected final Runnable getFlushOperation() {
   541         return flushOperation;
   313         return flushOperation;
   578             process();
   350             process();
   579         } catch (Exception e) {
   351         } catch (Exception e) {
   580             defaultErrorHandler(e);
   352             defaultErrorHandler(e);
   581         } finally {
   353         } finally {
   582             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
   354             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
       
   355             try {
       
   356                 close();
       
   357             } finally {
       
   358                 synchronized (terminated) {
       
   359                     terminated.notifyAll();
       
   360                 }
       
   361             }
   583         }
   362         }
   584     }
   363     }
   585 
   364 
   586     private void handleError(Throwable e) {
   365     private void handleError(Throwable e) {
   587         Consumer<?>[] consumers = configuration.errorActions;
   366         Consumer<?>[] consumers = configuration.errorActions;
   589             defaultErrorHandler(e);
   368             defaultErrorHandler(e);
   590             return;
   369             return;
   591         }
   370         }
   592         for (int i = 0; i < consumers.length; i++) {
   371         for (int i = 0; i < consumers.length; i++) {
   593             @SuppressWarnings("unchecked")
   372             @SuppressWarnings("unchecked")
   594             Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
   373             Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
   595             c.accept(e);
   374             conusmer.accept(e);
   596         }
   375         }
   597     }
       
   598 
       
   599     private void defaultErrorHandler(Throwable e) {
       
   600         e.printStackTrace();
       
   601     }
   376     }
   602 
   377 
   603     private void runFlushActions() {
   378     private void runFlushActions() {
   604         Runnable[] flushActions = configuration.getFlushActions();
   379         Runnable[] flushActions = configuration.getFlushActions();
   605         for (int i = 0; i < flushActions.length; i++) {
   380         for (int i = 0; i < flushActions.length; i++) {
   609                 handleError(e);
   384                 handleError(e);
   610             }
   385             }
   611         }
   386         }
   612     }
   387     }
   613 
   388 
   614     private void run(AccessControlContext acc) {
   389     private void run(AccessControlContext accessControlContext) {
   615         AccessController.doPrivileged(new PrivilegedAction<Void>() {
   390         AccessController.doPrivileged(new PrivilegedAction<Void>() {
   616             @Override
   391             @Override
   617             public Void run() {
   392             public Void run() {
   618                 execute();
   393                 execute();
   619                 return null;
   394                 return null;
   620             }
   395             }
   621         }, acc);
   396         }, accessControlContext);
       
   397     }
       
   398 
       
   399     private String nextThreadName() {
       
   400         counter.incrementAndGet();
       
   401         return "JFR Event Stream " + counter;
       
   402     }
       
   403 
       
   404     private void defaultErrorHandler(Throwable e) {
       
   405         e.printStackTrace();
   622     }
   406     }
   623 }
   407 }