src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
branchJEP-349-branch
changeset 57985 be121cbf3284
parent 57971 aa7b1ea52413
child 58020 f082177c5023
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -45,6 +45,7 @@
 import jdk.jfr.internal.LogTag;
 import jdk.jfr.internal.Logger;
 import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.SecuritySupport;
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.InternalEventFilter;
 
@@ -59,18 +60,18 @@
  * - security
  *
  */
-abstract class AbstractEventStream implements Runnable {
+abstract class AbstractEventStream implements EventStream {
 
-    public static final class StreamConfiguration {
+    protected static final class StreamConfiguration {
         private static final Runnable[] NO_ACTIONS = new Runnable[0];
 
+        private Consumer<?>[] errorActions = new Consumer<?>[0];
         private Runnable[] flushActions = NO_ACTIONS;
         private Runnable[] closeActions = NO_ACTIONS;
-        private Runnable[] errorActions = NO_ACTIONS;
-
-        private EventDispatcher[] dispatchers = NO_DISPATCHERS;
+        private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
         private boolean changedConfiguration = false;
         private boolean closed = false;
         private boolean reuse = true;
@@ -84,6 +85,7 @@
         public StreamConfiguration(StreamConfiguration configuration) {
             this.flushActions = configuration.flushActions;
             this.closeActions = configuration.closeActions;
+            this.errorActions = configuration.errorActions;
             this.dispatchers = configuration.dispatchers;
             this.eventFilter = configuration.eventFilter;
             this.closed = configuration.closed;
@@ -100,50 +102,50 @@
         public StreamConfiguration() {
         }
 
-        final public StreamConfiguration remove(Object action) {
+        public StreamConfiguration remove(Object action) {
             flushActions = remove(flushActions, action);
             closeActions = remove(closeActions, action);
             dispatchers = removeDispatch(dispatchers, action);
             return this;
         }
 
-        final public StreamConfiguration addDispatcher(EventDispatcher e) {
+        public StreamConfiguration addDispatcher(EventDispatcher e) {
             dispatchers = add(dispatchers, e);
             eventFilter = buildFilter(dispatchers);
             dispatcherLookup = new LongMap<>();
             return this;
         }
 
-        final public StreamConfiguration addFlushAction(Runnable action) {
+        public StreamConfiguration addFlushAction(Runnable action) {
             flushActions = add(flushActions, action);
             return this;
         }
 
-        final public StreamConfiguration addCloseAction(Runnable action) {
+        public StreamConfiguration addCloseAction(Runnable action) {
             closeActions = add(closeActions, action);
             return this;
         }
 
-        public StreamConfiguration addErrorAction(Runnable action) {
+        public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
             errorActions = add(errorActions, action);
             return this;
         }
 
-        final public StreamConfiguration setClosed(boolean closed) {
+        public StreamConfiguration setClosed(boolean closed) {
             this.closed = closed;
             changedConfiguration = true;
             return this;
         }
 
-        final public boolean isClosed() {
+        public boolean isClosed() {
             return closed;
         }
 
-        final public Runnable[] getCloseActions() {
+        public Runnable[] getCloseActions() {
             return closeActions;
         }
 
-        final public Runnable[] getFlushActions() {
+        public Runnable[] getFlushActions() {
             return flushActions;
         }
 
@@ -197,13 +199,13 @@
             return ef;
         }
 
-        final public StreamConfiguration setReuse(boolean reuse) {
+        public StreamConfiguration setReuse(boolean reuse) {
             this.reuse = reuse;
             changedConfiguration = true;
             return this;
         }
 
-        final public StreamConfiguration setOrdered(boolean ordered) {
+        public StreamConfiguration setOrdered(boolean ordered) {
             this.ordered = ordered;
             changedConfiguration = true;
             return this;
@@ -216,14 +218,14 @@
             return this;
         }
 
-        final public StreamConfiguration setStartTime(Instant startTime) {
+        public StreamConfiguration setStartTime(Instant startTime) {
             this.startTime = startTime;
             this.startNanos = Utils.timeToNanos(startTime);
             changedConfiguration = true;
             return this;
         }
 
-        final public Instant getStartTime() {
+        public Instant getStartTime() {
             return startTime;
         }
 
@@ -231,50 +233,50 @@
             return endTime;
         }
 
-        final public boolean isStarted() {
+        public boolean isStarted() {
             return started;
         }
 
-        final public StreamConfiguration setStartNanos(long startNanos) {
+        public StreamConfiguration setStartNanos(long startNanos) {
             this.startNanos = startNanos;
             changedConfiguration = true;
             return this;
         }
 
-        final public void setStarted(boolean started) {
+        public void setStarted(boolean started) {
             this.started = started;
             changedConfiguration = true;
         }
 
-        final public boolean hasChanged() {
+        public boolean hasChanged() {
             return changedConfiguration;
         }
 
-        final public boolean getReuse() {
+        public boolean getReuse() {
             return reuse;
         }
 
-        final public boolean getOrdered() {
+        public boolean getOrdered() {
             return ordered;
         }
 
-        final public InternalEventFilter getFiler() {
+        public InternalEventFilter getFiler() {
             return eventFilter;
         }
 
-        final public long getStartNanos() {
+        public long getStartNanos() {
             return startNanos;
         }
 
-        final public long getEndNanos() {
+        public long getEndNanos() {
             return endNanos;
         }
 
-        final public InternalEventFilter getFilter() {
+        public InternalEventFilter getFilter() {
             return eventFilter;
         }
 
-        final public String toString() {
+        public String toString() {
             StringBuilder sb = new StringBuilder();
             for (Runnable flush : flushActions) {
                 sb.append("Flush Action: ").append(flush).append("\n");
@@ -282,6 +284,9 @@
             for (Runnable close : closeActions) {
                 sb.append("Close Action: " + close + "\n");
             }
+            for (Consumer<?> error : errorActions) {
+                sb.append("Error Action: " + error + "\n");
+            }
             for (EventDispatcher dispatcher : dispatchers) {
                 sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
             }
@@ -301,9 +306,8 @@
         }
     }
 
-    final static class EventDispatcher {
+    private final static class EventDispatcher {
         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-
         final private String eventName;
         final private Consumer<RecordedEvent> action;
 
@@ -325,13 +329,13 @@
         }
     }
 
-    public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
+    final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
 
-    private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-    private final AccessControlContext accessControlContext;
     private final Thread thread;
     private final boolean active;
-    protected final Runnable flushOperation = () -> runFlushActions();
+    private final Runnable flushOperation = () -> runFlushActions();
+    private final AccessControlContext accessControllerContext;
+    private final Object configurationLock = new Object();
 
     // Modified by updateConfiguration()
     protected volatile StreamConfiguration configuration = new StreamConfiguration();
@@ -341,21 +345,231 @@
     private EventDispatcher[] lastEventDispatch;
 
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
-        this.accessControlContext = acc;
+        this.accessControllerContext = Objects.requireNonNull(acc);
         this.active = active;
-        // Create thread object in constructor to ensure caller has
-        // permission before constructing object
-        thread = new Thread(this);
+        this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
+    }
+
+    @Override
+    abstract public void start();
+
+    @Override
+    abstract public void startAsync();
+
+    @Override
+    abstract public void close();
+
+    // Purpose of synchronizing the following methods is
+    // to serialize changes to the configuration, so only one
+    // thread at a time can change the configuration.
+    //
+    // The purpose is not to guard the configuration field. A new
+    // configuration is published using updateConfiguration
+    //
+    @Override
+    public final void setOrdered(boolean ordered) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
+        }
+    }
+
+    @Override
+    public final void setReuse(boolean reuse) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
+        }
+    }
+
+    @Override
+    public final void setStartTime(Instant startTime) {
+        Objects.nonNull(startTime);
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            if (startTime.isBefore(Instant.EPOCH)) {
+                startTime = Instant.EPOCH;
+            }
+            updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+        }
+    }
+
+    @Override
+    public final void setEndTime(Instant endTime) {
+        Objects.requireNonNull(endTime);
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+        }
+    }
+
+    @Override
+    public final void onEvent(Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            add(new EventDispatcher(action));
+        }
+    }
+
+    @Override
+    public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(eventName);
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            add(new EventDispatcher(eventName, action));
+        }
+    }
+
+    @Override
+    public final void onFlush(Runnable action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
+        }
+    }
+
+    @Override
+    public final void onClose(Runnable action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
+        }
+    }
+
+    @Override
+    public final void onError(Consumer<Throwable> action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
+        }
+    }
+
+    @Override
+    public final boolean remove(Object action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            return updateConfiguration(new StreamConfiguration(configuration).remove(action));
+        }
     }
 
-    public final void run() {
-        AccessController.doPrivileged(new PrivilegedAction<Void>() {
-            @Override
-            public Void run() {
-                execute();
-                return null;
+    @Override
+    public final void awaitTermination() {
+        awaitTermination(Duration.ofMillis(0));
+    }
+
+    @Override
+    public final void awaitTermination(Duration timeout) {
+        Objects.requireNonNull(timeout);
+        if (thread != Thread.currentThread()) {
+            try {
+                thread.join(timeout.toMillis());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    protected abstract void process() throws Exception;
+
+    protected final void clearLastDispatch() {
+        lastEventDispatch = null;
+        lastEventType = null;
+    }
+
+    protected final void dispatch(RecordedEvent event) {
+        EventType type = event.getEventType();
+        EventDispatcher[] dispatchers = null;
+        if (type == lastEventType) {
+            dispatchers = lastEventDispatch;
+        } else {
+            dispatchers = configuration.dispatcherLookup.get(type.getId());
+            if (dispatchers == null) {
+                List<EventDispatcher> list = new ArrayList<>();
+                for (EventDispatcher e : configuration.getDispatchers()) {
+                    if (e.accepts(type)) {
+                        list.add(e);
+                    }
+                }
+                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+                configuration.dispatcherLookup.put(type.getId(), dispatchers);
+            }
+            lastEventDispatch = dispatchers;
+        }
+        for (int i = 0; i < dispatchers.length; i++) {
+            try {
+                dispatchers[i].offer(event);
+            } catch (Exception e) {
+                handleError(e);
             }
-        }, accessControlContext);
+        }
+    }
+
+    protected final void runCloseActions() {
+        Runnable[] closeActions = configuration.getCloseActions();
+        for (int i = 0; i < closeActions.length; i++) {
+            try {
+                closeActions[i].run();
+            } catch (Exception e) {
+                handleError(e);
+            }
+        }
+    }
+
+    protected final void setClosed(boolean closed) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
+        }
+    }
+
+    protected final boolean isClosed() {
+        return configuration.isClosed();
+    }
+
+    protected final void startAsync(long startNanos) {
+        startInternal(startNanos);
+        thread.start();
+    }
+
+    protected final void start(long startNanos) {
+        startInternal(startNanos);
+        run(accessControllerContext);
+    }
+
+    protected final Runnable getFlushOperation() {
+        return flushOperation;
+    }
+
+    private void add(EventDispatcher e) {
+        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
+    }
+
+    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+        if (!Thread.holdsLock(configurationLock)) {
+            throw new InternalError("Modification of configuration without proper lock");
+        }
+        if (newConfiguration.hasChanged()) {
+            // Publish objects held by configuration object
+            VarHandle.releaseFence();
+            configuration = newConfiguration;
+            return true;
+        }
+        return false;
+    }
+
+    private void startInternal(long startNanos) {
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Event stream can only be started once");
+            }
+            StreamConfiguration c = new StreamConfiguration(configuration);
+            if (active) {
+                c.setStartNanos(startNanos);
+            }
+            c.setStarted(true);
+            updateConfiguration(c);
+        }
     }
 
     private void execute() {
@@ -369,202 +583,41 @@
         }
     }
 
-    public abstract void process() throws Exception;
-
-    protected final void clearLastDispatch() {
-        lastEventDispatch = null;
-        lastEventType = null;
-    }
-
-    protected final void dispatch(RecordedEvent event) {
-        EventType type = event.getEventType();
-        EventDispatcher[] ret = null;
-        if (type == lastEventType) {
-            ret = lastEventDispatch;
-        } else {
-            ret = configuration.dispatcherLookup.get(type.getId());
-            if (ret == null) {
-                List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : configuration.getDispatchers()) {
-                    if (e.accepts(type)) {
-                        list.add(e);
-                    }
-                }
-                ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                configuration.dispatcherLookup.put(type.getId(), ret);
-            }
-            lastEventDispatch = ret;
+    private void handleError(Throwable e) {
+        Consumer<?>[] consumers = configuration.errorActions;
+        if (consumers.length == 0) {
+            defaultErrorHandler(e);
+            return;
         }
-        for (int i = 0; i < ret.length; i++) {
-            try {
-                ret[i].offer(event);
-            } catch (Exception e) {
-                handleError(e);
-            }
+        for (int i = 0; i < consumers.length; i++) {
+            @SuppressWarnings("unchecked")
+            Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
+            c.accept(e);
         }
     }
 
-    protected final void handleError(Throwable e) {
-        StreamConfiguration c = configuration;
-        if (c.errorActions.length == 0) {
-            defaultErrorHandler(e);
-            return;
-        }
-        for (Runnable r : c.errorActions) {
-            r.run();
-        }
-    }
-
-    protected final void defaultErrorHandler(Throwable e) {
+    private void defaultErrorHandler(Throwable e) {
         e.printStackTrace();
     }
 
-    public final void runCloseActions() {
-        Runnable[] cas = configuration.getCloseActions();
-        for (int i = 0; i < cas.length; i++) {
+    private void runFlushActions() {
+        Runnable[] flushActions = configuration.getFlushActions();
+        for (int i = 0; i < flushActions.length; i++) {
             try {
-                cas[i].run();
+                flushActions[i].run();
             } catch (Exception e) {
                 handleError(e);
             }
         }
     }
 
-    public final void runFlushActions() {
-        Runnable[] fas = configuration.getFlushActions();
-        for (int i = 0; i < fas.length; i++) {
-            try {
-                fas[i].run();
-            } catch (Exception e) {
-                handleError(e);
+    private void run(AccessControlContext acc) {
+        AccessController.doPrivileged(new PrivilegedAction<Void>() {
+            @Override
+            public Void run() {
+                execute();
+                return null;
             }
-        }
-
-    }
-
-    // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration, so only one
-    // thread at a time can change the configuration.
-    //
-    // The purpose is not to guard the configuration field. A new
-    // configuration is published using updateConfiguration
-    //
-    public final synchronized boolean remove(Object action) {
-        return updateConfiguration(new StreamConfiguration(configuration).remove(action));
-    }
-
-    public final synchronized void onEvent(Consumer<RecordedEvent> action) {
-        add(new EventDispatcher(action));
-    }
-
-    public final synchronized void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        add(new EventDispatcher(eventName, action));
-    }
-
-    private final synchronized void add(EventDispatcher e) {
-        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
-    }
-
-    public final synchronized void onFlush(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
-    }
-
-    public final synchronized void addCloseAction(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
-    }
-
-    public final synchronized void addErrorAction(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
-    }
-
-    public final synchronized void setClosed(boolean closed) {
-        updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
-    }
-
-    public final synchronized void setReuse(boolean reuse) {
-        updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
-    }
-
-    public final synchronized void setOrdered(boolean ordered) {
-        updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
-    }
-
-    public final synchronized void setStartNanos(long startNanos) {
-        updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos));
+        }, acc);
     }
-
-    public final synchronized void setStartTime(Instant startTime) {
-        Objects.nonNull(startTime);
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Stream is already started");
-        }
-        if (startTime.isBefore(Instant.EPOCH)) {
-            startTime = Instant.EPOCH;
-        }
-        updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
-    }
-
-    public final synchronized void setEndTime(Instant endTime) {
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Stream is already started");
-        }
-        updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
-    }
-
-    protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
-        if (!Thread.holdsLock(this)) {
-            throw new InternalError("Modification of configuration without proper lock");
-        }
-        if (newConfiguration.hasChanged()) {
-            // Publish objects held by configuration object
-            VarHandle.releaseFence();
-            configuration = newConfiguration;
-            return true;
-        }
-        return false;
-    }
-
-    public final boolean isClosed() {
-        return configuration.isClosed();
-    }
-
-    public final void startAsync(long startNanos) {
-        startInternal(startNanos);
-        thread.start();
-    }
-
-    public final void start(long startNanos) {
-        startInternal(startNanos);
-        run();
-    }
-
-    private synchronized void startInternal(long startNanos) {
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Event stream can only be started once");
-        }
-        StreamConfiguration c = new StreamConfiguration(configuration);
-        if (active) {
-            c.setStartNanos(startNanos);
-        }
-        c.setStarted(true);
-        updateConfiguration(c);
-    }
-
-    public final void awaitTermination(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        if (thread != Thread.currentThread()) {
-            try {
-                thread.join(timeout.toMillis());
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public final void awaitTermination() {
-        awaitTermination(Duration.ofMillis(0));
-    }
-
-    abstract public void close();
-
 }
\ No newline at end of file