diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -45,6 +45,7 @@ import jdk.jfr.internal.LogTag; import jdk.jfr.internal.Logger; import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.SecuritySupport; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.InternalEventFilter; @@ -59,18 +60,18 @@ * - security * */ -abstract class AbstractEventStream implements Runnable { +abstract class AbstractEventStream implements EventStream { - public static final class StreamConfiguration { + protected static final class StreamConfiguration { private static final Runnable[] NO_ACTIONS = new Runnable[0]; + private Consumer[] errorActions = new Consumer[0]; private Runnable[] flushActions = NO_ACTIONS; private Runnable[] closeActions = NO_ACTIONS; - private Runnable[] errorActions = NO_ACTIONS; - - private EventDispatcher[] dispatchers = NO_DISPATCHERS; + private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS; private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; private LongMap dispatcherLookup = new LongMap<>(); + private boolean changedConfiguration = false; private boolean closed = false; private boolean reuse = true; @@ -84,6 +85,7 @@ public StreamConfiguration(StreamConfiguration configuration) { this.flushActions = configuration.flushActions; this.closeActions = configuration.closeActions; + this.errorActions = configuration.errorActions; this.dispatchers = configuration.dispatchers; this.eventFilter = configuration.eventFilter; this.closed = configuration.closed; @@ -100,50 +102,50 @@ public StreamConfiguration() { } - final public StreamConfiguration remove(Object action) { + public StreamConfiguration remove(Object action) { flushActions = remove(flushActions, action); closeActions = remove(closeActions, action); dispatchers = removeDispatch(dispatchers, action); return this; } - final public StreamConfiguration addDispatcher(EventDispatcher e) { + public StreamConfiguration addDispatcher(EventDispatcher e) { dispatchers = add(dispatchers, e); eventFilter = buildFilter(dispatchers); dispatcherLookup = new LongMap<>(); return this; } - final public StreamConfiguration addFlushAction(Runnable action) { + public StreamConfiguration addFlushAction(Runnable action) { flushActions = add(flushActions, action); return this; } - final public StreamConfiguration addCloseAction(Runnable action) { + public StreamConfiguration addCloseAction(Runnable action) { closeActions = add(closeActions, action); return this; } - public StreamConfiguration addErrorAction(Runnable action) { + public StreamConfiguration addErrorAction(Consumer action) { errorActions = add(errorActions, action); return this; } - final public StreamConfiguration setClosed(boolean closed) { + public StreamConfiguration setClosed(boolean closed) { this.closed = closed; changedConfiguration = true; return this; } - final public boolean isClosed() { + public boolean isClosed() { return closed; } - final public Runnable[] getCloseActions() { + public Runnable[] getCloseActions() { return closeActions; } - final public Runnable[] getFlushActions() { + public Runnable[] getFlushActions() { return flushActions; } @@ -197,13 +199,13 @@ return ef; } - final public StreamConfiguration setReuse(boolean reuse) { + public StreamConfiguration setReuse(boolean reuse) { this.reuse = reuse; changedConfiguration = true; return this; } - final public StreamConfiguration setOrdered(boolean ordered) { + public StreamConfiguration setOrdered(boolean ordered) { this.ordered = ordered; changedConfiguration = true; return this; @@ -216,14 +218,14 @@ return this; } - final public StreamConfiguration setStartTime(Instant startTime) { + public StreamConfiguration setStartTime(Instant startTime) { this.startTime = startTime; this.startNanos = Utils.timeToNanos(startTime); changedConfiguration = true; return this; } - final public Instant getStartTime() { + public Instant getStartTime() { return startTime; } @@ -231,50 +233,50 @@ return endTime; } - final public boolean isStarted() { + public boolean isStarted() { return started; } - final public StreamConfiguration setStartNanos(long startNanos) { + public StreamConfiguration setStartNanos(long startNanos) { this.startNanos = startNanos; changedConfiguration = true; return this; } - final public void setStarted(boolean started) { + public void setStarted(boolean started) { this.started = started; changedConfiguration = true; } - final public boolean hasChanged() { + public boolean hasChanged() { return changedConfiguration; } - final public boolean getReuse() { + public boolean getReuse() { return reuse; } - final public boolean getOrdered() { + public boolean getOrdered() { return ordered; } - final public InternalEventFilter getFiler() { + public InternalEventFilter getFiler() { return eventFilter; } - final public long getStartNanos() { + public long getStartNanos() { return startNanos; } - final public long getEndNanos() { + public long getEndNanos() { return endNanos; } - final public InternalEventFilter getFilter() { + public InternalEventFilter getFilter() { return eventFilter; } - final public String toString() { + public String toString() { StringBuilder sb = new StringBuilder(); for (Runnable flush : flushActions) { sb.append("Flush Action: ").append(flush).append("\n"); @@ -282,6 +284,9 @@ for (Runnable close : closeActions) { sb.append("Close Action: " + close + "\n"); } + for (Consumer error : errorActions) { + sb.append("Error Action: " + error + "\n"); + } for (EventDispatcher dispatcher : dispatchers) { sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n"); } @@ -301,9 +306,8 @@ } } - final static class EventDispatcher { + private final static class EventDispatcher { final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - final private String eventName; final private Consumer action; @@ -325,13 +329,13 @@ } } - public final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); + final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); - private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - private final AccessControlContext accessControlContext; private final Thread thread; private final boolean active; - protected final Runnable flushOperation = () -> runFlushActions(); + private final Runnable flushOperation = () -> runFlushActions(); + private final AccessControlContext accessControllerContext; + private final Object configurationLock = new Object(); // Modified by updateConfiguration() protected volatile StreamConfiguration configuration = new StreamConfiguration(); @@ -341,21 +345,231 @@ private EventDispatcher[] lastEventDispatch; public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { - this.accessControlContext = acc; + this.accessControllerContext = Objects.requireNonNull(acc); this.active = active; - // Create thread object in constructor to ensure caller has - // permission before constructing object - thread = new Thread(this); + this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc)); + } + + @Override + abstract public void start(); + + @Override + abstract public void startAsync(); + + @Override + abstract public void close(); + + // Purpose of synchronizing the following methods is + // to serialize changes to the configuration, so only one + // thread at a time can change the configuration. + // + // The purpose is not to guard the configuration field. A new + // configuration is published using updateConfiguration + // + @Override + public final void setOrdered(boolean ordered) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); + } + } + + @Override + public final void setReuse(boolean reuse) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); + } + } + + @Override + public final void setStartTime(Instant startTime) { + Objects.nonNull(startTime); + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + if (startTime.isBefore(Instant.EPOCH)) { + startTime = Instant.EPOCH; + } + updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); + } + } + + @Override + public final void setEndTime(Instant endTime) { + Objects.requireNonNull(endTime); + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); + } + } + + @Override + public final void onEvent(Consumer action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + add(new EventDispatcher(action)); + } + } + + @Override + public final void onEvent(String eventName, Consumer action) { + Objects.requireNonNull(eventName); + Objects.requireNonNull(action); + synchronized (configurationLock) { + add(new EventDispatcher(eventName, action)); + } + } + + @Override + public final void onFlush(Runnable action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); + } + } + + @Override + public final void onClose(Runnable action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); + } + } + + @Override + public final void onError(Consumer action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); + } + } + + @Override + public final boolean remove(Object action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + return updateConfiguration(new StreamConfiguration(configuration).remove(action)); + } } - public final void run() { - AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Void run() { - execute(); - return null; + @Override + public final void awaitTermination() { + awaitTermination(Duration.ofMillis(0)); + } + + @Override + public final void awaitTermination(Duration timeout) { + Objects.requireNonNull(timeout); + if (thread != Thread.currentThread()) { + try { + thread.join(timeout.toMillis()); + } catch (InterruptedException e) { + // ignore + } + } + } + + protected abstract void process() throws Exception; + + protected final void clearLastDispatch() { + lastEventDispatch = null; + lastEventType = null; + } + + protected final void dispatch(RecordedEvent event) { + EventType type = event.getEventType(); + EventDispatcher[] dispatchers = null; + if (type == lastEventType) { + dispatchers = lastEventDispatch; + } else { + dispatchers = configuration.dispatcherLookup.get(type.getId()); + if (dispatchers == null) { + List list = new ArrayList<>(); + for (EventDispatcher e : configuration.getDispatchers()) { + if (e.accepts(type)) { + list.add(e); + } + } + dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); + configuration.dispatcherLookup.put(type.getId(), dispatchers); + } + lastEventDispatch = dispatchers; + } + for (int i = 0; i < dispatchers.length; i++) { + try { + dispatchers[i].offer(event); + } catch (Exception e) { + handleError(e); } - }, accessControlContext); + } + } + + protected final void runCloseActions() { + Runnable[] closeActions = configuration.getCloseActions(); + for (int i = 0; i < closeActions.length; i++) { + try { + closeActions[i].run(); + } catch (Exception e) { + handleError(e); + } + } + } + + protected final void setClosed(boolean closed) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); + } + } + + protected final boolean isClosed() { + return configuration.isClosed(); + } + + protected final void startAsync(long startNanos) { + startInternal(startNanos); + thread.start(); + } + + protected final void start(long startNanos) { + startInternal(startNanos); + run(accessControllerContext); + } + + protected final Runnable getFlushOperation() { + return flushOperation; + } + + private void add(EventDispatcher e) { + updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); + } + + private boolean updateConfiguration(StreamConfiguration newConfiguration) { + if (!Thread.holdsLock(configurationLock)) { + throw new InternalError("Modification of configuration without proper lock"); + } + if (newConfiguration.hasChanged()) { + // Publish objects held by configuration object + VarHandle.releaseFence(); + configuration = newConfiguration; + return true; + } + return false; + } + + private void startInternal(long startNanos) { + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Event stream can only be started once"); + } + StreamConfiguration c = new StreamConfiguration(configuration); + if (active) { + c.setStartNanos(startNanos); + } + c.setStarted(true); + updateConfiguration(c); + } } private void execute() { @@ -369,202 +583,41 @@ } } - public abstract void process() throws Exception; - - protected final void clearLastDispatch() { - lastEventDispatch = null; - lastEventType = null; - } - - protected final void dispatch(RecordedEvent event) { - EventType type = event.getEventType(); - EventDispatcher[] ret = null; - if (type == lastEventType) { - ret = lastEventDispatch; - } else { - ret = configuration.dispatcherLookup.get(type.getId()); - if (ret == null) { - List list = new ArrayList<>(); - for (EventDispatcher e : configuration.getDispatchers()) { - if (e.accepts(type)) { - list.add(e); - } - } - ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); - configuration.dispatcherLookup.put(type.getId(), ret); - } - lastEventDispatch = ret; + private void handleError(Throwable e) { + Consumer[] consumers = configuration.errorActions; + if (consumers.length == 0) { + defaultErrorHandler(e); + return; } - for (int i = 0; i < ret.length; i++) { - try { - ret[i].offer(event); - } catch (Exception e) { - handleError(e); - } + for (int i = 0; i < consumers.length; i++) { + @SuppressWarnings("unchecked") + Consumer c = (Consumer) consumers[i]; + c.accept(e); } } - protected final void handleError(Throwable e) { - StreamConfiguration c = configuration; - if (c.errorActions.length == 0) { - defaultErrorHandler(e); - return; - } - for (Runnable r : c.errorActions) { - r.run(); - } - } - - protected final void defaultErrorHandler(Throwable e) { + private void defaultErrorHandler(Throwable e) { e.printStackTrace(); } - public final void runCloseActions() { - Runnable[] cas = configuration.getCloseActions(); - for (int i = 0; i < cas.length; i++) { + private void runFlushActions() { + Runnable[] flushActions = configuration.getFlushActions(); + for (int i = 0; i < flushActions.length; i++) { try { - cas[i].run(); + flushActions[i].run(); } catch (Exception e) { handleError(e); } } } - public final void runFlushActions() { - Runnable[] fas = configuration.getFlushActions(); - for (int i = 0; i < fas.length; i++) { - try { - fas[i].run(); - } catch (Exception e) { - handleError(e); + private void run(AccessControlContext acc) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + execute(); + return null; } - } - - } - - // Purpose of synchronizing the following methods is - // to serialize changes to the configuration, so only one - // thread at a time can change the configuration. - // - // The purpose is not to guard the configuration field. A new - // configuration is published using updateConfiguration - // - public final synchronized boolean remove(Object action) { - return updateConfiguration(new StreamConfiguration(configuration).remove(action)); - } - - public final synchronized void onEvent(Consumer action) { - add(new EventDispatcher(action)); - } - - public final synchronized void onEvent(String eventName, Consumer action) { - add(new EventDispatcher(eventName, action)); - } - - private final synchronized void add(EventDispatcher e) { - updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); - } - - public final synchronized void onFlush(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); - } - - public final synchronized void addCloseAction(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); - } - - public final synchronized void addErrorAction(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); - } - - public final synchronized void setClosed(boolean closed) { - updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); - } - - public final synchronized void setReuse(boolean reuse) { - updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); - } - - public final synchronized void setOrdered(boolean ordered) { - updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); - } - - public final synchronized void setStartNanos(long startNanos) { - updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos)); + }, acc); } - - public final synchronized void setStartTime(Instant startTime) { - Objects.nonNull(startTime); - if (configuration.isStarted()) { - throw new IllegalStateException("Stream is already started"); - } - if (startTime.isBefore(Instant.EPOCH)) { - startTime = Instant.EPOCH; - } - updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); - } - - public final synchronized void setEndTime(Instant endTime) { - if (configuration.isStarted()) { - throw new IllegalStateException("Stream is already started"); - } - updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); - } - - protected boolean updateConfiguration(StreamConfiguration newConfiguration) { - if (!Thread.holdsLock(this)) { - throw new InternalError("Modification of configuration without proper lock"); - } - if (newConfiguration.hasChanged()) { - // Publish objects held by configuration object - VarHandle.releaseFence(); - configuration = newConfiguration; - return true; - } - return false; - } - - public final boolean isClosed() { - return configuration.isClosed(); - } - - public final void startAsync(long startNanos) { - startInternal(startNanos); - thread.start(); - } - - public final void start(long startNanos) { - startInternal(startNanos); - run(); - } - - private synchronized void startInternal(long startNanos) { - if (configuration.isStarted()) { - throw new IllegalStateException("Event stream can only be started once"); - } - StreamConfiguration c = new StreamConfiguration(configuration); - if (active) { - c.setStartNanos(startNanos); - } - c.setStarted(true); - updateConfiguration(c); - } - - public final void awaitTermination(Duration timeout) { - Objects.requireNonNull(timeout); - if (thread != Thread.currentThread()) { - try { - thread.join(timeout.toMillis()); - } catch (InterruptedException e) { - // ignore - } - } - } - - public final void awaitTermination() { - awaitTermination(Duration.ofMillis(0)); - } - - abstract public void close(); - } \ No newline at end of file