--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:08:41 2019 +0200
@@ -45,6 +45,7 @@
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.InternalEventFilter;
@@ -59,18 +60,18 @@
* - security
*
*/
-abstract class AbstractEventStream implements Runnable {
+abstract class AbstractEventStream implements EventStream {
- public static final class StreamConfiguration {
+ protected static final class StreamConfiguration {
private static final Runnable[] NO_ACTIONS = new Runnable[0];
+ private Consumer<?>[] errorActions = new Consumer<?>[0];
private Runnable[] flushActions = NO_ACTIONS;
private Runnable[] closeActions = NO_ACTIONS;
- private Runnable[] errorActions = NO_ACTIONS;
-
- private EventDispatcher[] dispatchers = NO_DISPATCHERS;
+ private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
private boolean changedConfiguration = false;
private boolean closed = false;
private boolean reuse = true;
@@ -84,6 +85,7 @@
public StreamConfiguration(StreamConfiguration configuration) {
this.flushActions = configuration.flushActions;
this.closeActions = configuration.closeActions;
+ this.errorActions = configuration.errorActions;
this.dispatchers = configuration.dispatchers;
this.eventFilter = configuration.eventFilter;
this.closed = configuration.closed;
@@ -100,50 +102,50 @@
public StreamConfiguration() {
}
- final public StreamConfiguration remove(Object action) {
+ public StreamConfiguration remove(Object action) {
flushActions = remove(flushActions, action);
closeActions = remove(closeActions, action);
dispatchers = removeDispatch(dispatchers, action);
return this;
}
- final public StreamConfiguration addDispatcher(EventDispatcher e) {
+ public StreamConfiguration addDispatcher(EventDispatcher e) {
dispatchers = add(dispatchers, e);
eventFilter = buildFilter(dispatchers);
dispatcherLookup = new LongMap<>();
return this;
}
- final public StreamConfiguration addFlushAction(Runnable action) {
+ public StreamConfiguration addFlushAction(Runnable action) {
flushActions = add(flushActions, action);
return this;
}
- final public StreamConfiguration addCloseAction(Runnable action) {
+ public StreamConfiguration addCloseAction(Runnable action) {
closeActions = add(closeActions, action);
return this;
}
- public StreamConfiguration addErrorAction(Runnable action) {
+ public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
errorActions = add(errorActions, action);
return this;
}
- final public StreamConfiguration setClosed(boolean closed) {
+ public StreamConfiguration setClosed(boolean closed) {
this.closed = closed;
changedConfiguration = true;
return this;
}
- final public boolean isClosed() {
+ public boolean isClosed() {
return closed;
}
- final public Runnable[] getCloseActions() {
+ public Runnable[] getCloseActions() {
return closeActions;
}
- final public Runnable[] getFlushActions() {
+ public Runnable[] getFlushActions() {
return flushActions;
}
@@ -197,13 +199,13 @@
return ef;
}
- final public StreamConfiguration setReuse(boolean reuse) {
+ public StreamConfiguration setReuse(boolean reuse) {
this.reuse = reuse;
changedConfiguration = true;
return this;
}
- final public StreamConfiguration setOrdered(boolean ordered) {
+ public StreamConfiguration setOrdered(boolean ordered) {
this.ordered = ordered;
changedConfiguration = true;
return this;
@@ -216,14 +218,14 @@
return this;
}
- final public StreamConfiguration setStartTime(Instant startTime) {
+ public StreamConfiguration setStartTime(Instant startTime) {
this.startTime = startTime;
this.startNanos = Utils.timeToNanos(startTime);
changedConfiguration = true;
return this;
}
- final public Instant getStartTime() {
+ public Instant getStartTime() {
return startTime;
}
@@ -231,50 +233,50 @@
return endTime;
}
- final public boolean isStarted() {
+ public boolean isStarted() {
return started;
}
- final public StreamConfiguration setStartNanos(long startNanos) {
+ public StreamConfiguration setStartNanos(long startNanos) {
this.startNanos = startNanos;
changedConfiguration = true;
return this;
}
- final public void setStarted(boolean started) {
+ public void setStarted(boolean started) {
this.started = started;
changedConfiguration = true;
}
- final public boolean hasChanged() {
+ public boolean hasChanged() {
return changedConfiguration;
}
- final public boolean getReuse() {
+ public boolean getReuse() {
return reuse;
}
- final public boolean getOrdered() {
+ public boolean getOrdered() {
return ordered;
}
- final public InternalEventFilter getFiler() {
+ public InternalEventFilter getFiler() {
return eventFilter;
}
- final public long getStartNanos() {
+ public long getStartNanos() {
return startNanos;
}
- final public long getEndNanos() {
+ public long getEndNanos() {
return endNanos;
}
- final public InternalEventFilter getFilter() {
+ public InternalEventFilter getFilter() {
return eventFilter;
}
- final public String toString() {
+ public String toString() {
StringBuilder sb = new StringBuilder();
for (Runnable flush : flushActions) {
sb.append("Flush Action: ").append(flush).append("\n");
@@ -282,6 +284,9 @@
for (Runnable close : closeActions) {
sb.append("Close Action: " + close + "\n");
}
+ for (Consumer<?> error : errorActions) {
+ sb.append("Error Action: " + error + "\n");
+ }
for (EventDispatcher dispatcher : dispatchers) {
sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
}
@@ -301,9 +306,8 @@
}
}
- final static class EventDispatcher {
+ private final static class EventDispatcher {
final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-
final private String eventName;
final private Consumer<RecordedEvent> action;
@@ -325,13 +329,13 @@
}
}
- public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
+ final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
- private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
- private final AccessControlContext accessControlContext;
private final Thread thread;
private final boolean active;
- protected final Runnable flushOperation = () -> runFlushActions();
+ private final Runnable flushOperation = () -> runFlushActions();
+ private final AccessControlContext accessControllerContext;
+ private final Object configurationLock = new Object();
// Modified by updateConfiguration()
protected volatile StreamConfiguration configuration = new StreamConfiguration();
@@ -341,21 +345,231 @@
private EventDispatcher[] lastEventDispatch;
public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
- this.accessControlContext = acc;
+ this.accessControllerContext = Objects.requireNonNull(acc);
this.active = active;
- // Create thread object in constructor to ensure caller has
- // permission before constructing object
- thread = new Thread(this);
+ this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
+ }
+
+ @Override
+ abstract public void start();
+
+ @Override
+ abstract public void startAsync();
+
+ @Override
+ abstract public void close();
+
+ // Purpose of synchronizing the following methods is
+ // to serialize changes to the configuration, so only one
+ // thread at a time can change the configuration.
+ //
+ // The purpose is not to guard the configuration field. A new
+ // configuration is published using updateConfiguration
+ //
+ @Override
+ public final void setOrdered(boolean ordered) {
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
+ }
+ }
+
+ @Override
+ public final void setReuse(boolean reuse) {
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
+ }
+ }
+
+ @Override
+ public final void setStartTime(Instant startTime) {
+ Objects.nonNull(startTime);
+ synchronized (configurationLock) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ if (startTime.isBefore(Instant.EPOCH)) {
+ startTime = Instant.EPOCH;
+ }
+ updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+ }
+ }
+
+ @Override
+ public final void setEndTime(Instant endTime) {
+ Objects.requireNonNull(endTime);
+ synchronized (configurationLock) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+ }
+ }
+
+ @Override
+ public final void onEvent(Consumer<RecordedEvent> action) {
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ add(new EventDispatcher(action));
+ }
+ }
+
+ @Override
+ public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
+ Objects.requireNonNull(eventName);
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ add(new EventDispatcher(eventName, action));
+ }
+ }
+
+ @Override
+ public final void onFlush(Runnable action) {
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
+ }
+ }
+
+ @Override
+ public final void onClose(Runnable action) {
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
+ }
+ }
+
+ @Override
+ public final void onError(Consumer<Throwable> action) {
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
+ }
+ }
+
+ @Override
+ public final boolean remove(Object action) {
+ Objects.requireNonNull(action);
+ synchronized (configurationLock) {
+ return updateConfiguration(new StreamConfiguration(configuration).remove(action));
+ }
}
- public final void run() {
- AccessController.doPrivileged(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
- execute();
- return null;
+ @Override
+ public final void awaitTermination() {
+ awaitTermination(Duration.ofMillis(0));
+ }
+
+ @Override
+ public final void awaitTermination(Duration timeout) {
+ Objects.requireNonNull(timeout);
+ if (thread != Thread.currentThread()) {
+ try {
+ thread.join(timeout.toMillis());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ protected abstract void process() throws Exception;
+
+ protected final void clearLastDispatch() {
+ lastEventDispatch = null;
+ lastEventType = null;
+ }
+
+ protected final void dispatch(RecordedEvent event) {
+ EventType type = event.getEventType();
+ EventDispatcher[] dispatchers = null;
+ if (type == lastEventType) {
+ dispatchers = lastEventDispatch;
+ } else {
+ dispatchers = configuration.dispatcherLookup.get(type.getId());
+ if (dispatchers == null) {
+ List<EventDispatcher> list = new ArrayList<>();
+ for (EventDispatcher e : configuration.getDispatchers()) {
+ if (e.accepts(type)) {
+ list.add(e);
+ }
+ }
+ dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+ configuration.dispatcherLookup.put(type.getId(), dispatchers);
+ }
+ lastEventDispatch = dispatchers;
+ }
+ for (int i = 0; i < dispatchers.length; i++) {
+ try {
+ dispatchers[i].offer(event);
+ } catch (Exception e) {
+ handleError(e);
}
- }, accessControlContext);
+ }
+ }
+
+ protected final void runCloseActions() {
+ Runnable[] closeActions = configuration.getCloseActions();
+ for (int i = 0; i < closeActions.length; i++) {
+ try {
+ closeActions[i].run();
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ protected final void setClosed(boolean closed) {
+ synchronized (configurationLock) {
+ updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
+ }
+ }
+
+ protected final boolean isClosed() {
+ return configuration.isClosed();
+ }
+
+ protected final void startAsync(long startNanos) {
+ startInternal(startNanos);
+ thread.start();
+ }
+
+ protected final void start(long startNanos) {
+ startInternal(startNanos);
+ run(accessControllerContext);
+ }
+
+ protected final Runnable getFlushOperation() {
+ return flushOperation;
+ }
+
+ private void add(EventDispatcher e) {
+ updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
+ }
+
+ private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+ if (!Thread.holdsLock(configurationLock)) {
+ throw new InternalError("Modification of configuration without proper lock");
+ }
+ if (newConfiguration.hasChanged()) {
+ // Publish objects held by configuration object
+ VarHandle.releaseFence();
+ configuration = newConfiguration;
+ return true;
+ }
+ return false;
+ }
+
+ private void startInternal(long startNanos) {
+ synchronized (configurationLock) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ StreamConfiguration c = new StreamConfiguration(configuration);
+ if (active) {
+ c.setStartNanos(startNanos);
+ }
+ c.setStarted(true);
+ updateConfiguration(c);
+ }
}
private void execute() {
@@ -369,202 +583,41 @@
}
}
- public abstract void process() throws Exception;
-
- protected final void clearLastDispatch() {
- lastEventDispatch = null;
- lastEventType = null;
- }
-
- protected final void dispatch(RecordedEvent event) {
- EventType type = event.getEventType();
- EventDispatcher[] ret = null;
- if (type == lastEventType) {
- ret = lastEventDispatch;
- } else {
- ret = configuration.dispatcherLookup.get(type.getId());
- if (ret == null) {
- List<EventDispatcher> list = new ArrayList<>();
- for (EventDispatcher e : configuration.getDispatchers()) {
- if (e.accepts(type)) {
- list.add(e);
- }
- }
- ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
- configuration.dispatcherLookup.put(type.getId(), ret);
- }
- lastEventDispatch = ret;
+ private void handleError(Throwable e) {
+ Consumer<?>[] consumers = configuration.errorActions;
+ if (consumers.length == 0) {
+ defaultErrorHandler(e);
+ return;
}
- for (int i = 0; i < ret.length; i++) {
- try {
- ret[i].offer(event);
- } catch (Exception e) {
- handleError(e);
- }
+ for (int i = 0; i < consumers.length; i++) {
+ @SuppressWarnings("unchecked")
+ Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
+ c.accept(e);
}
}
- protected final void handleError(Throwable e) {
- StreamConfiguration c = configuration;
- if (c.errorActions.length == 0) {
- defaultErrorHandler(e);
- return;
- }
- for (Runnable r : c.errorActions) {
- r.run();
- }
- }
-
- protected final void defaultErrorHandler(Throwable e) {
+ private void defaultErrorHandler(Throwable e) {
e.printStackTrace();
}
- public final void runCloseActions() {
- Runnable[] cas = configuration.getCloseActions();
- for (int i = 0; i < cas.length; i++) {
+ private void runFlushActions() {
+ Runnable[] flushActions = configuration.getFlushActions();
+ for (int i = 0; i < flushActions.length; i++) {
try {
- cas[i].run();
+ flushActions[i].run();
} catch (Exception e) {
handleError(e);
}
}
}
- public final void runFlushActions() {
- Runnable[] fas = configuration.getFlushActions();
- for (int i = 0; i < fas.length; i++) {
- try {
- fas[i].run();
- } catch (Exception e) {
- handleError(e);
+ private void run(AccessControlContext acc) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ execute();
+ return null;
}
- }
-
- }
-
- // Purpose of synchronizing the following methods is
- // to serialize changes to the configuration, so only one
- // thread at a time can change the configuration.
- //
- // The purpose is not to guard the configuration field. A new
- // configuration is published using updateConfiguration
- //
- public final synchronized boolean remove(Object action) {
- return updateConfiguration(new StreamConfiguration(configuration).remove(action));
- }
-
- public final synchronized void onEvent(Consumer<RecordedEvent> action) {
- add(new EventDispatcher(action));
- }
-
- public final synchronized void onEvent(String eventName, Consumer<RecordedEvent> action) {
- add(new EventDispatcher(eventName, action));
- }
-
- private final synchronized void add(EventDispatcher e) {
- updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
- }
-
- public final synchronized void onFlush(Runnable action) {
- updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
- }
-
- public final synchronized void addCloseAction(Runnable action) {
- updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
- }
-
- public final synchronized void addErrorAction(Runnable action) {
- updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
- }
-
- public final synchronized void setClosed(boolean closed) {
- updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
- }
-
- public final synchronized void setReuse(boolean reuse) {
- updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
- }
-
- public final synchronized void setOrdered(boolean ordered) {
- updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
- }
-
- public final synchronized void setStartNanos(long startNanos) {
- updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos));
+ }, acc);
}
-
- public final synchronized void setStartTime(Instant startTime) {
- Objects.nonNull(startTime);
- if (configuration.isStarted()) {
- throw new IllegalStateException("Stream is already started");
- }
- if (startTime.isBefore(Instant.EPOCH)) {
- startTime = Instant.EPOCH;
- }
- updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
- }
-
- public final synchronized void setEndTime(Instant endTime) {
- if (configuration.isStarted()) {
- throw new IllegalStateException("Stream is already started");
- }
- updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
- }
-
- protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
- if (!Thread.holdsLock(this)) {
- throw new InternalError("Modification of configuration without proper lock");
- }
- if (newConfiguration.hasChanged()) {
- // Publish objects held by configuration object
- VarHandle.releaseFence();
- configuration = newConfiguration;
- return true;
- }
- return false;
- }
-
- public final boolean isClosed() {
- return configuration.isClosed();
- }
-
- public final void startAsync(long startNanos) {
- startInternal(startNanos);
- thread.start();
- }
-
- public final void start(long startNanos) {
- startInternal(startNanos);
- run();
- }
-
- private synchronized void startInternal(long startNanos) {
- if (configuration.isStarted()) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- StreamConfiguration c = new StreamConfiguration(configuration);
- if (active) {
- c.setStartNanos(startNanos);
- }
- c.setStarted(true);
- updateConfiguration(c);
- }
-
- public final void awaitTermination(Duration timeout) {
- Objects.requireNonNull(timeout);
- if (thread != Thread.currentThread()) {
- try {
- thread.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- public final void awaitTermination() {
- awaitTermination(Duration.ofMillis(0));
- }
-
- abstract public void close();
-
}
\ No newline at end of file