--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -26,20 +26,16 @@
package jdk.jfr.consumer;
import java.io.IOException;
-import java.lang.invoke.VarHandle;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import jdk.jfr.EventType;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
@@ -59,40 +55,19 @@
*/
abstract class AbstractEventStream implements EventStream {
- final static class EventDispatcher {
- final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
- final String eventName;
- final Consumer<RecordedEvent> action;
-
- public EventDispatcher(Consumer<RecordedEvent> action) {
- this(null, action);
- }
+ static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
- public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
- this.eventName = eventName;
- this.action = action;
- }
-
- public void offer(RecordedEvent event) {
- action.accept(event);
- }
-
- public boolean accepts(EventType eventType) {
- return (eventName == null || eventType.getName().equals(eventName));
- }
- }
-
- final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
private final static AtomicLong counter = new AtomicLong(1);
- private volatile Thread thread;
private final Object terminated = new Object();
private final boolean active;
- private final Runnable flushOperation = () -> runFlushActions();
+ private final Runnable flushOperation = () -> dispatcher().runFlushActions();
private final AccessControlContext accessControllerContext;
- private final Object configurationLock = new Object();
+ private final StreamConfiguration configuration = new StreamConfiguration();
- // Modified by updateConfiguration()
- protected volatile StreamConfiguration configuration = new StreamConfiguration();
+ private volatile Thread thread;
+ private Dispatcher dispatcher;
+
+ private volatile boolean closed;
public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
@@ -108,99 +83,85 @@
@Override
abstract public void close();
- // Purpose of synchronizing the following methods is
- // to serialize changes to the configuration so only one
- // thread at a time can change the configuration.
- //
- // The purpose is not to guard the configuration field. A new
- // configuration is published using updateConfiguration
- //
+ protected final Dispatcher dispatcher() {
+ if (configuration.hasChanged()) {
+ synchronized (configuration) {
+ dispatcher = new Dispatcher(configuration);
+ }
+ }
+ return dispatcher;
+ }
+
@Override
public final void setOrdered(boolean ordered) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
- }
+ configuration.setOrdered(ordered);
}
@Override
public final void setReuse(boolean reuse) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
- }
+ configuration.setReuse(reuse);
}
@Override
public final void setStartTime(Instant startTime) {
Objects.nonNull(startTime);
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Stream is already started");
}
if (startTime.isBefore(Instant.EPOCH)) {
startTime = Instant.EPOCH;
}
- updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+ configuration.setStartTime(startTime);
}
}
@Override
public final void setEndTime(Instant endTime) {
Objects.requireNonNull(endTime);
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Stream is already started");
}
- updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+ configuration.setEndTime(endTime);
}
}
@Override
public final void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- add(new EventDispatcher(action));
- }
+ configuration.addEventAction(action);
}
@Override
public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- add(new EventDispatcher(eventName, action));
- }
+ configuration.addEventAction(eventName, action);
}
@Override
public final void onFlush(Runnable action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
- }
+ configuration.addFlushAction(action);
}
@Override
public final void onClose(Runnable action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
- }
+ configuration.addCloseAction(action);
}
@Override
public final void onError(Consumer<Throwable> action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
- }
+ configuration.addErrorAction(action);
}
@Override
public final boolean remove(Object action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- return updateConfiguration(new StreamConfiguration(configuration).remove(action));
- }
+ return configuration.remove(action);
}
@Override
@@ -247,53 +208,12 @@
protected abstract void process() throws Exception;
- protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
- EventType type = event.getEventType();
- EventDispatcher[] dispatchers = null;
- if (type == c.cacheEventType) {
- dispatchers = c.cacheDispatchers;
- } else {
- dispatchers = c.dispatcherLookup.get(type.getId());
- if (dispatchers == null) {
- List<EventDispatcher> list = new ArrayList<>();
- for (EventDispatcher e : c.getDispatchers()) {
- if (e.accepts(type)) {
- list.add(e);
- }
- }
- dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
- c.dispatcherLookup.put(type.getId(), dispatchers);
- }
- c.cacheDispatchers = dispatchers;
- }
- for (int i = 0; i < dispatchers.length; i++) {
- try {
- dispatchers[i].offer(event);
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- protected final void runCloseActions() {
- Runnable[] closeActions = configuration.getCloseActions();
- for (int i = 0; i < closeActions.length; i++) {
- try {
- closeActions[i].run();
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
protected final void setClosed(boolean closed) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
- }
+ this.closed = closed;
}
protected final boolean isClosed() {
- return configuration.isClosed();
+ return closed;
}
protected final void startAsync(long startNanos) {
@@ -313,34 +233,15 @@
return flushOperation;
}
- private void add(EventDispatcher e) {
- updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
- }
-
- private boolean updateConfiguration(StreamConfiguration newConfiguration) {
- if (!Thread.holdsLock(configurationLock)) {
- throw new InternalError("Modification of configuration without proper lock");
- }
- if (newConfiguration.hasChanged()) {
- // Publish objects held by configuration object
- VarHandle.releaseFence();
- configuration = newConfiguration;
- return true;
- }
- return false;
- }
-
private void startInternal(long startNanos) {
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Event stream can only be started once");
}
- StreamConfiguration c = new StreamConfiguration(configuration);
if (active) {
- c.setStartNanos(startNanos);
+ configuration.setStartNanos(startNanos);
}
- c.setStarted(true);
- updateConfiguration(c);
+ configuration.setStarted(true);
}
}
@@ -348,8 +249,14 @@
JVM.getJVM().exclude(Thread.currentThread());
try {
process();
+ } catch (IOException ioe) {
+ // This can happen if a chunk file is removed, or
+ // a file is access that has been closed
+ // This is "normal" behavior for streaming and the
+ // stream will be closed when this happens
} catch (Exception e) {
- defaultErrorHandler(e);
+ // TODO: Remove before integrating
+ e.printStackTrace();
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
try {
@@ -362,30 +269,6 @@
}
}
- private void handleError(Throwable e) {
- Consumer<?>[] consumers = configuration.errorActions;
- if (consumers.length == 0) {
- defaultErrorHandler(e);
- return;
- }
- for (int i = 0; i < consumers.length; i++) {
- @SuppressWarnings("unchecked")
- Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
- conusmer.accept(e);
- }
- }
-
- private void runFlushActions() {
- Runnable[] flushActions = configuration.getFlushActions();
- for (int i = 0; i < flushActions.length; i++) {
- try {
- flushActions[i].run();
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
private void run(AccessControlContext accessControlContext) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
@@ -400,8 +283,4 @@
counter.incrementAndGet();
return "JFR Event Stream " + counter;
}
-
- private void defaultErrorHandler(Throwable e) {
- e.printStackTrace();
- }
}
\ No newline at end of file