--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Aug 30 20:39:38 2019 +0200
@@ -66,8 +66,12 @@
private Runnable[] flushActions = NO_ACTIONS;
private Runnable[] closeActions = NO_ACTIONS;
+ private Runnable[] errorActions = NO_ACTIONS;
+
private EventDispatcher[] dispatchers = NO_DISPATCHERS;
private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+ private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+ private boolean changedConfiguration = false;
private boolean closed = false;
private boolean reuse = true;
private boolean ordered = true;
@@ -76,8 +80,6 @@
private boolean started = false;
private long startNanos = 0;
private long endNanos = Long.MAX_VALUE;
- private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
- private boolean changed = false;
public StreamConfiguration(StreamConfiguration configuration) {
this.flushActions = configuration.flushActions;
@@ -122,9 +124,14 @@
return this;
}
+ public StreamConfiguration addErrorAction(Runnable action) {
+ errorActions = add(errorActions, action);
+ return this;
+ }
+
final public StreamConfiguration setClosed(boolean closed) {
this.closed = closed;
- changed = true;
+ changedConfiguration = true;
return this;
}
@@ -154,7 +161,7 @@
if (modified) {
eventFilter = buildFilter(result);
dispatcherLookup = new LongMap<>();
- changed = true;
+ changedConfiguration = true;
}
return result;
}
@@ -165,7 +172,7 @@
if (array[i] != action) {
list.add(array[i]);
} else {
- changed = true;
+ changedConfiguration = true;
}
}
return list.toArray(array);
@@ -174,7 +181,7 @@
private <T> T[] add(T[] array, T object) {
List<T> list = new ArrayList<>(Arrays.asList(array));
list.add(object);
- changed = true;
+ changedConfiguration = true;
return list.toArray(array);
}
@@ -192,26 +199,27 @@
final public StreamConfiguration setReuse(boolean reuse) {
this.reuse = reuse;
- changed = true;
+ changedConfiguration = true;
return this;
}
final public StreamConfiguration setOrdered(boolean ordered) {
this.ordered = ordered;
- changed = true;
+ changedConfiguration = true;
return this;
}
+
public StreamConfiguration setEndTime(Instant endTime) {
this.endTime = endTime;
this.endNanos = Utils.timeToNanos(endTime);
- changed = true;
+ changedConfiguration = true;
return this;
}
final public StreamConfiguration setStartTime(Instant startTime) {
this.startTime = startTime;
this.startNanos = Utils.timeToNanos(startTime);
- changed = true;
+ changedConfiguration = true;
return this;
}
@@ -229,17 +237,17 @@
final public StreamConfiguration setStartNanos(long startNanos) {
this.startNanos = startNanos;
- changed = true;
+ changedConfiguration = true;
return this;
}
final public void setStarted(boolean started) {
this.started = started;
- changed = true;
+ changedConfiguration = true;
}
final public boolean hasChanged() {
- return changed;
+ return changedConfiguration;
}
final public boolean getReuse() {
@@ -291,10 +299,6 @@
private EventDispatcher[] getDispatchers() {
return dispatchers;
}
-
-
-
-
}
final static class EventDispatcher {
@@ -329,7 +333,7 @@
private final boolean active;
protected final Runnable flushOperation = () -> runFlushActions();
- // Updated by updateConfiguration()
+ // Modified by updateConfiguration()
protected volatile StreamConfiguration configuration = new StreamConfiguration();
// Cache the last event type and dispatch.
@@ -352,32 +356,20 @@
return null;
}
}, accessControlContext);
-
}
private void execute() {
JVM.getJVM().exclude(Thread.currentThread());
try {
process();
- } catch (IOException e) {
- if (!isClosed()) {
- logException(e);
- }
} catch (Exception e) {
- logException(e);
+ defaultErrorHandler(e);
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
}
}
- private void logException(Exception e) {
- // FIXME: e.printStackTrace(); for debugging purposes,
- // remove before before integration
- e.printStackTrace();
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
- }
-
- public abstract void process() throws IOException;
+ public abstract void process() throws Exception;
protected final void clearLastDispatch() {
lastEventDispatch = null;
@@ -407,18 +399,33 @@
try {
ret[i].offer(event);
} catch (Exception e) {
- logException(e);
+ handleError(e);
}
}
}
+ protected final void handleError(Throwable e) {
+ StreamConfiguration c = configuration;
+ if (c.errorActions.length == 0) {
+ defaultErrorHandler(e);
+ return;
+ }
+ for (Runnable r : c.errorActions) {
+ r.run();
+ }
+ }
+
+ protected final void defaultErrorHandler(Throwable e) {
+ e.printStackTrace();
+ }
+
public final void runCloseActions() {
Runnable[] cas = configuration.getCloseActions();
for (int i = 0; i < cas.length; i++) {
try {
cas[i].run();
} catch (Exception e) {
- logException(e);
+ handleError(e);
}
}
}
@@ -429,9 +436,10 @@
try {
fas[i].run();
} catch (Exception e) {
- logException(e);
+ handleError(e);
}
}
+
}
// Purpose of synchronizing the following methods is
@@ -465,6 +473,10 @@
updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
}
+ public final synchronized void addErrorAction(Runnable action) {
+ updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
+ }
+
public final synchronized void setClosed(boolean closed) {
updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
}
@@ -492,18 +504,17 @@
updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
}
- public final void setEndTime(Instant endTime) {
- if (configuration.isStarted()) {
- throw new IllegalStateException("Stream is already started");
+ public final synchronized void setEndTime(Instant endTime) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
}
- updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
-}
-
protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
- // Changes to the configuration must happen one at a time, so make
- // sure that we have the monitor
- Thread.holdsLock(this);
+ if (!Thread.holdsLock(this)) {
+ throw new InternalError("Modification of configuration without proper lock");
+ }
if (newConfiguration.hasChanged()) {
// Publish objects held by configuration object
VarHandle.releaseFence();
@@ -527,18 +538,16 @@
run();
}
- private void startInternal(long startNanos) {
- synchronized (this) {
- if (configuration.isStarted()) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- StreamConfiguration c = new StreamConfiguration(configuration);
- if (active) {
- c.setStartNanos(startNanos);
- }
- c.setStarted(true);
- updateConfiguration(c);
+ private synchronized void startInternal(long startNanos) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Event stream can only be started once");
}
+ StreamConfiguration c = new StreamConfiguration(configuration);
+ if (active) {
+ c.setStartNanos(startNanos);
+ }
+ c.setStarted(true);
+ updateConfiguration(c);
}
public final void awaitTermination(Duration timeout) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Aug 30 20:39:38 2019 +0200
@@ -67,7 +67,7 @@
}
@Override
- public void process() throws IOException {
+ public void process() throws Exception {
final StreamConfiguration c1 = configuration;
Path path;
boolean validStartTime = active || c1.getStartTime() != null;
@@ -272,5 +272,11 @@
eventStream.startAsync(startNanos);
}
+ @Override
+ public void onError(Consumer<Throwable> action) {
+ // TODO Auto-generated method stub
+
+ }
+
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Aug 30 20:39:38 2019 +0200
@@ -228,4 +228,10 @@
public void setEndTime(Instant endTime) {
eventStream.setEndTime(endTime);
}
+
+ @Override
+ public void onError(Consumer<Throwable> action) {
+ // TODO Auto-generated method stub
+
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Aug 30 20:39:38 2019 +0200
@@ -133,6 +133,21 @@
void onFlush(Runnable action);
/**
+ * Performs an action if an exception occurs when processing the stream.
+ * <p>
+ * if an error handler has not been added to the stream, an exception stack
+ * trace is printed to standard error.
+ * <p>
+ * Adding an error handler overrides the default behavior. If multiple error
+ * handlers have been added, they will be executed in the order they were
+ * added.
+ *
+ * @param action an action to be performed if an exception occurs, not
+ * {@code null}
+ */
+ void onError(Consumer<Throwable> action);
+
+ /**
* Performs an action when the event stream is closed.
* <p>
* If the stream is already closed, the action will be executed immediately
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri Aug 30 20:39:38 2019 +0200
@@ -362,4 +362,9 @@
public void setEndTime(Instant endTime) {
stream.setStartTime(endTime);
}
+
+ @Override
+ public void onError(Consumer<Throwable> action) {
+ stream.onError(action);
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Fri Aug 30 20:39:38 2019 +0200
@@ -52,9 +52,9 @@
private final RecordingInput input;
private final long id;
private long absoluteEventStart;
- private long chunkSize;
- private long constantPoolPosition;
- private long metadataPosition;
+ private long chunkSize = 0;
+ private long constantPoolPosition = 0;
+ private long metadataPosition = 0;
private long durationNanos;
private long absoluteChunkEnd;
private boolean isFinished;
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java Fri Aug 30 20:39:38 2019 +0200
@@ -41,7 +41,7 @@
Class<?> c = RecordedObject.class;
Class.forName(c.getName(), true, c.getClassLoader());
} catch (ClassNotFoundException e) {
- new InternalError("shuld not happen");
+ throw new InternalError("Should not happen");
}
}
return INSTANCE;