src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
branchJEP-349-branch
changeset 57971 aa7b1ea52413
parent 57948 59ad17a19e24
child 57985 be121cbf3284
equal deleted inserted replaced
57949:74a38c0b5054 57971:aa7b1ea52413
    64     public static final class StreamConfiguration {
    64     public static final class StreamConfiguration {
    65         private static final Runnable[] NO_ACTIONS = new Runnable[0];
    65         private static final Runnable[] NO_ACTIONS = new Runnable[0];
    66 
    66 
    67         private Runnable[] flushActions = NO_ACTIONS;
    67         private Runnable[] flushActions = NO_ACTIONS;
    68         private Runnable[] closeActions = NO_ACTIONS;
    68         private Runnable[] closeActions = NO_ACTIONS;
       
    69         private Runnable[] errorActions = NO_ACTIONS;
       
    70 
    69         private EventDispatcher[] dispatchers = NO_DISPATCHERS;
    71         private EventDispatcher[] dispatchers = NO_DISPATCHERS;
    70         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
    72         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
       
    73         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
       
    74         private boolean changedConfiguration = false;
    71         private boolean closed = false;
    75         private boolean closed = false;
    72         private boolean reuse = true;
    76         private boolean reuse = true;
    73         private boolean ordered = true;
    77         private boolean ordered = true;
    74         private Instant startTime = null;
    78         private Instant startTime = null;
    75         private Instant endTime = null;
    79         private Instant endTime = null;
    76         private boolean started = false;
    80         private boolean started = false;
    77         private long startNanos = 0;
    81         private long startNanos = 0;
    78         private long endNanos = Long.MAX_VALUE;
    82         private long endNanos = Long.MAX_VALUE;
    79         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
       
    80         private boolean changed = false;
       
    81 
    83 
    82         public StreamConfiguration(StreamConfiguration configuration) {
    84         public StreamConfiguration(StreamConfiguration configuration) {
    83             this.flushActions = configuration.flushActions;
    85             this.flushActions = configuration.flushActions;
    84             this.closeActions = configuration.closeActions;
    86             this.closeActions = configuration.closeActions;
    85             this.dispatchers = configuration.dispatchers;
    87             this.dispatchers = configuration.dispatchers;
   120         final public StreamConfiguration addCloseAction(Runnable action) {
   122         final public StreamConfiguration addCloseAction(Runnable action) {
   121             closeActions = add(closeActions, action);
   123             closeActions = add(closeActions, action);
   122             return this;
   124             return this;
   123         }
   125         }
   124 
   126 
       
   127         public StreamConfiguration addErrorAction(Runnable action) {
       
   128             errorActions = add(errorActions, action);
       
   129             return this;
       
   130         }
       
   131 
   125         final public StreamConfiguration setClosed(boolean closed) {
   132         final public StreamConfiguration setClosed(boolean closed) {
   126             this.closed = closed;
   133             this.closed = closed;
   127             changed = true;
   134             changedConfiguration = true;
   128             return this;
   135             return this;
   129         }
   136         }
   130 
   137 
   131         final public boolean isClosed() {
   138         final public boolean isClosed() {
   132             return closed;
   139             return closed;
   152             }
   159             }
   153             EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
   160             EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
   154             if (modified) {
   161             if (modified) {
   155                 eventFilter = buildFilter(result);
   162                 eventFilter = buildFilter(result);
   156                 dispatcherLookup = new LongMap<>();
   163                 dispatcherLookup = new LongMap<>();
   157                 changed = true;
   164                 changedConfiguration = true;
   158             }
   165             }
   159             return result;
   166             return result;
   160         }
   167         }
   161 
   168 
   162         private <T> T[] remove(T[] array, Object action) {
   169         private <T> T[] remove(T[] array, Object action) {
   163             List<T> list = new ArrayList<>(array.length);
   170             List<T> list = new ArrayList<>(array.length);
   164             for (int i = 0; i < array.length; i++) {
   171             for (int i = 0; i < array.length; i++) {
   165                 if (array[i] != action) {
   172                 if (array[i] != action) {
   166                     list.add(array[i]);
   173                     list.add(array[i]);
   167                 } else {
   174                 } else {
   168                     changed = true;
   175                     changedConfiguration = true;
   169                 }
   176                 }
   170             }
   177             }
   171             return list.toArray(array);
   178             return list.toArray(array);
   172         }
   179         }
   173 
   180 
   174         private <T> T[] add(T[] array, T object) {
   181         private <T> T[] add(T[] array, T object) {
   175             List<T> list = new ArrayList<>(Arrays.asList(array));
   182             List<T> list = new ArrayList<>(Arrays.asList(array));
   176             list.add(object);
   183             list.add(object);
   177             changed = true;
   184             changedConfiguration = true;
   178             return list.toArray(array);
   185             return list.toArray(array);
   179         }
   186         }
   180 
   187 
   181         private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
   188         private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
   182             InternalEventFilter ef = new InternalEventFilter();
   189             InternalEventFilter ef = new InternalEventFilter();
   190             return ef;
   197             return ef;
   191         }
   198         }
   192 
   199 
   193         final public StreamConfiguration setReuse(boolean reuse) {
   200         final public StreamConfiguration setReuse(boolean reuse) {
   194             this.reuse = reuse;
   201             this.reuse = reuse;
   195             changed = true;
   202             changedConfiguration = true;
   196             return this;
   203             return this;
   197         }
   204         }
   198 
   205 
   199         final public StreamConfiguration setOrdered(boolean ordered) {
   206         final public StreamConfiguration setOrdered(boolean ordered) {
   200             this.ordered = ordered;
   207             this.ordered = ordered;
   201             changed = true;
   208             changedConfiguration = true;
   202             return this;
   209             return this;
   203         }
   210         }
       
   211 
   204         public StreamConfiguration setEndTime(Instant endTime) {
   212         public StreamConfiguration setEndTime(Instant endTime) {
   205             this.endTime = endTime;
   213             this.endTime = endTime;
   206             this.endNanos = Utils.timeToNanos(endTime);
   214             this.endNanos = Utils.timeToNanos(endTime);
   207             changed = true;
   215             changedConfiguration = true;
   208             return this;
   216             return this;
   209         }
   217         }
   210 
   218 
   211         final public StreamConfiguration setStartTime(Instant startTime) {
   219         final public StreamConfiguration setStartTime(Instant startTime) {
   212             this.startTime = startTime;
   220             this.startTime = startTime;
   213             this.startNanos = Utils.timeToNanos(startTime);
   221             this.startNanos = Utils.timeToNanos(startTime);
   214             changed = true;
   222             changedConfiguration = true;
   215             return this;
   223             return this;
   216         }
   224         }
   217 
   225 
   218         final public Instant getStartTime() {
   226         final public Instant getStartTime() {
   219             return startTime;
   227             return startTime;
   227             return started;
   235             return started;
   228         }
   236         }
   229 
   237 
   230         final public StreamConfiguration setStartNanos(long startNanos) {
   238         final public StreamConfiguration setStartNanos(long startNanos) {
   231             this.startNanos = startNanos;
   239             this.startNanos = startNanos;
   232             changed = true;
   240             changedConfiguration = true;
   233             return this;
   241             return this;
   234         }
   242         }
   235 
   243 
   236         final public void setStarted(boolean started) {
   244         final public void setStarted(boolean started) {
   237             this.started = started;
   245             this.started = started;
   238             changed = true;
   246             changedConfiguration = true;
   239         }
   247         }
   240 
   248 
   241         final public boolean hasChanged() {
   249         final public boolean hasChanged() {
   242             return changed;
   250             return changedConfiguration;
   243         }
   251         }
   244 
   252 
   245         final public boolean getReuse() {
   253         final public boolean getReuse() {
   246             return reuse;
   254             return reuse;
   247         }
   255         }
   289         }
   297         }
   290 
   298 
   291         private EventDispatcher[] getDispatchers() {
   299         private EventDispatcher[] getDispatchers() {
   292             return dispatchers;
   300             return dispatchers;
   293         }
   301         }
   294 
       
   295 
       
   296 
       
   297 
       
   298     }
   302     }
   299 
   303 
   300     final static class EventDispatcher {
   304     final static class EventDispatcher {
   301         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
   305         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
   302 
   306 
   327     private final AccessControlContext accessControlContext;
   331     private final AccessControlContext accessControlContext;
   328     private final Thread thread;
   332     private final Thread thread;
   329     private final boolean active;
   333     private final boolean active;
   330     protected final Runnable flushOperation = () -> runFlushActions();
   334     protected final Runnable flushOperation = () -> runFlushActions();
   331 
   335 
   332     // Updated by updateConfiguration()
   336     // Modified by updateConfiguration()
   333     protected volatile StreamConfiguration configuration = new StreamConfiguration();
   337     protected volatile StreamConfiguration configuration = new StreamConfiguration();
   334 
   338 
   335     // Cache the last event type and dispatch.
   339     // Cache the last event type and dispatch.
   336     private EventType lastEventType;
   340     private EventType lastEventType;
   337     private EventDispatcher[] lastEventDispatch;
   341     private EventDispatcher[] lastEventDispatch;
   350             public Void run() {
   354             public Void run() {
   351                 execute();
   355                 execute();
   352                 return null;
   356                 return null;
   353             }
   357             }
   354         }, accessControlContext);
   358         }, accessControlContext);
   355 
       
   356     }
   359     }
   357 
   360 
   358     private void execute() {
   361     private void execute() {
   359         JVM.getJVM().exclude(Thread.currentThread());
   362         JVM.getJVM().exclude(Thread.currentThread());
   360         try {
   363         try {
   361             process();
   364             process();
   362         } catch (IOException e) {
       
   363             if (!isClosed()) {
       
   364                 logException(e);
       
   365             }
       
   366         } catch (Exception e) {
   365         } catch (Exception e) {
   367             logException(e);
   366             defaultErrorHandler(e);
   368         } finally {
   367         } finally {
   369             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
   368             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
   370         }
   369         }
   371     }
   370     }
   372 
   371 
   373     private void logException(Exception e) {
   372     public abstract void process() throws Exception;
   374         // FIXME: e.printStackTrace(); for debugging purposes,
       
   375         // remove before before integration
       
   376         e.printStackTrace();
       
   377         Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
       
   378     }
       
   379 
       
   380     public abstract void process() throws IOException;
       
   381 
   373 
   382     protected final void clearLastDispatch() {
   374     protected final void clearLastDispatch() {
   383         lastEventDispatch = null;
   375         lastEventDispatch = null;
   384         lastEventType = null;
   376         lastEventType = null;
   385     }
   377     }
   405         }
   397         }
   406         for (int i = 0; i < ret.length; i++) {
   398         for (int i = 0; i < ret.length; i++) {
   407             try {
   399             try {
   408                 ret[i].offer(event);
   400                 ret[i].offer(event);
   409             } catch (Exception e) {
   401             } catch (Exception e) {
   410                 logException(e);
   402                 handleError(e);
   411             }
   403             }
   412         }
   404         }
       
   405     }
       
   406 
       
   407     protected final void handleError(Throwable e) {
       
   408         StreamConfiguration c = configuration;
       
   409         if (c.errorActions.length == 0) {
       
   410             defaultErrorHandler(e);
       
   411             return;
       
   412         }
       
   413         for (Runnable r : c.errorActions) {
       
   414             r.run();
       
   415         }
       
   416     }
       
   417 
       
   418     protected final void defaultErrorHandler(Throwable e) {
       
   419         e.printStackTrace();
   413     }
   420     }
   414 
   421 
   415     public final void runCloseActions() {
   422     public final void runCloseActions() {
   416         Runnable[] cas = configuration.getCloseActions();
   423         Runnable[] cas = configuration.getCloseActions();
   417         for (int i = 0; i < cas.length; i++) {
   424         for (int i = 0; i < cas.length; i++) {
   418             try {
   425             try {
   419                 cas[i].run();
   426                 cas[i].run();
   420             } catch (Exception e) {
   427             } catch (Exception e) {
   421                 logException(e);
   428                 handleError(e);
   422             }
   429             }
   423         }
   430         }
   424     }
   431     }
   425 
   432 
   426     public final void runFlushActions() {
   433     public final void runFlushActions() {
   427         Runnable[] fas = configuration.getFlushActions();
   434         Runnable[] fas = configuration.getFlushActions();
   428         for (int i = 0; i < fas.length; i++) {
   435         for (int i = 0; i < fas.length; i++) {
   429             try {
   436             try {
   430                 fas[i].run();
   437                 fas[i].run();
   431             } catch (Exception e) {
   438             } catch (Exception e) {
   432                 logException(e);
   439                 handleError(e);
   433             }
   440             }
   434         }
   441         }
       
   442 
   435     }
   443     }
   436 
   444 
   437     // Purpose of synchronizing the following methods is
   445     // Purpose of synchronizing the following methods is
   438     // to serialize changes to the configuration, so only one
   446     // to serialize changes to the configuration, so only one
   439     // thread at a time can change the configuration.
   447     // thread at a time can change the configuration.
   463 
   471 
   464     public final synchronized void addCloseAction(Runnable action) {
   472     public final synchronized void addCloseAction(Runnable action) {
   465         updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
   473         updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
   466     }
   474     }
   467 
   475 
       
   476     public final synchronized void addErrorAction(Runnable action) {
       
   477         updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
       
   478     }
       
   479 
   468     public final synchronized void setClosed(boolean closed) {
   480     public final synchronized void setClosed(boolean closed) {
   469         updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
   481         updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
   470     }
   482     }
   471 
   483 
   472     public final synchronized void setReuse(boolean reuse) {
   484     public final synchronized void setReuse(boolean reuse) {
   490             startTime = Instant.EPOCH;
   502             startTime = Instant.EPOCH;
   491         }
   503         }
   492         updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
   504         updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
   493     }
   505     }
   494 
   506 
   495     public final void setEndTime(Instant endTime) {
   507     public final synchronized void setEndTime(Instant endTime) {
   496     if (configuration.isStarted()) {
   508         if (configuration.isStarted()) {
   497         throw new IllegalStateException("Stream is already started");
   509             throw new IllegalStateException("Stream is already started");
   498     }
   510         }
   499     updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
   511         updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
   500 }
   512     }
   501 
       
   502 
   513 
   503     protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
   514     protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
   504         // Changes to the configuration must happen one at a time, so make
   515         if (!Thread.holdsLock(this)) {
   505         // sure that we have the monitor
   516             throw new InternalError("Modification of configuration without proper lock");
   506         Thread.holdsLock(this);
   517         }
   507         if (newConfiguration.hasChanged()) {
   518         if (newConfiguration.hasChanged()) {
   508             // Publish objects held by configuration object
   519             // Publish objects held by configuration object
   509             VarHandle.releaseFence();
   520             VarHandle.releaseFence();
   510             configuration = newConfiguration;
   521             configuration = newConfiguration;
   511             return true;
   522             return true;
   525     public final void start(long startNanos) {
   536     public final void start(long startNanos) {
   526         startInternal(startNanos);
   537         startInternal(startNanos);
   527         run();
   538         run();
   528     }
   539     }
   529 
   540 
   530     private void startInternal(long startNanos) {
   541     private synchronized void startInternal(long startNanos) {
   531         synchronized (this) {
   542         if (configuration.isStarted()) {
   532             if (configuration.isStarted()) {
   543             throw new IllegalStateException("Event stream can only be started once");
   533                 throw new IllegalStateException("Event stream can only be started once");
   544         }
   534             }
   545         StreamConfiguration c = new StreamConfiguration(configuration);
   535             StreamConfiguration c = new StreamConfiguration(configuration);
   546         if (active) {
   536             if (active) {
   547             c.setStartNanos(startNanos);
   537                 c.setStartNanos(startNanos);
   548         }
   538             }
   549         c.setStarted(true);
   539             c.setStarted(true);
   550         updateConfiguration(c);
   540             updateConfiguration(c);
       
   541         }
       
   542     }
   551     }
   543 
   552 
   544     public final void awaitTermination(Duration timeout) {
   553     public final void awaitTermination(Duration timeout) {
   545         Objects.requireNonNull(timeout);
   554         Objects.requireNonNull(timeout);
   546         if (thread != Thread.currentThread()) {
   555         if (thread != Thread.currentThread()) {