# HG changeset patch # User egahlin # Date 1567190378 -7200 # Node ID aa7b1ea52413c961e5a9ee0353d5927689e50739 # Parent 74a38c0b5054170a799cdf37a6e0f0190fa46b33 Add onError handler + various cleanup diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Aug 30 20:39:38 2019 +0200 @@ -66,8 +66,12 @@ private Runnable[] flushActions = NO_ACTIONS; private Runnable[] closeActions = NO_ACTIONS; + private Runnable[] errorActions = NO_ACTIONS; + private EventDispatcher[] dispatchers = NO_DISPATCHERS; private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; + private LongMap dispatcherLookup = new LongMap<>(); + private boolean changedConfiguration = false; private boolean closed = false; private boolean reuse = true; private boolean ordered = true; @@ -76,8 +80,6 @@ private boolean started = false; private long startNanos = 0; private long endNanos = Long.MAX_VALUE; - private LongMap dispatcherLookup = new LongMap<>(); - private boolean changed = false; public StreamConfiguration(StreamConfiguration configuration) { this.flushActions = configuration.flushActions; @@ -122,9 +124,14 @@ return this; } + public StreamConfiguration addErrorAction(Runnable action) { + errorActions = add(errorActions, action); + return this; + } + final public StreamConfiguration setClosed(boolean closed) { this.closed = closed; - changed = true; + changedConfiguration = true; return this; } @@ -154,7 +161,7 @@ if (modified) { eventFilter = buildFilter(result); dispatcherLookup = new LongMap<>(); - changed = true; + changedConfiguration = true; } return result; } @@ -165,7 +172,7 @@ if (array[i] != action) { list.add(array[i]); } else { - changed = true; + changedConfiguration = true; } } return list.toArray(array); @@ -174,7 +181,7 @@ private T[] add(T[] array, T object) { List list = new ArrayList<>(Arrays.asList(array)); list.add(object); - changed = true; + changedConfiguration = true; return list.toArray(array); } @@ -192,26 +199,27 @@ final public StreamConfiguration setReuse(boolean reuse) { this.reuse = reuse; - changed = true; + changedConfiguration = true; return this; } final public StreamConfiguration setOrdered(boolean ordered) { this.ordered = ordered; - changed = true; + changedConfiguration = true; return this; } + public StreamConfiguration setEndTime(Instant endTime) { this.endTime = endTime; this.endNanos = Utils.timeToNanos(endTime); - changed = true; + changedConfiguration = true; return this; } final public StreamConfiguration setStartTime(Instant startTime) { this.startTime = startTime; this.startNanos = Utils.timeToNanos(startTime); - changed = true; + changedConfiguration = true; return this; } @@ -229,17 +237,17 @@ final public StreamConfiguration setStartNanos(long startNanos) { this.startNanos = startNanos; - changed = true; + changedConfiguration = true; return this; } final public void setStarted(boolean started) { this.started = started; - changed = true; + changedConfiguration = true; } final public boolean hasChanged() { - return changed; + return changedConfiguration; } final public boolean getReuse() { @@ -291,10 +299,6 @@ private EventDispatcher[] getDispatchers() { return dispatchers; } - - - - } final static class EventDispatcher { @@ -329,7 +333,7 @@ private final boolean active; protected final Runnable flushOperation = () -> runFlushActions(); - // Updated by updateConfiguration() + // Modified by updateConfiguration() protected volatile StreamConfiguration configuration = new StreamConfiguration(); // Cache the last event type and dispatch. @@ -352,32 +356,20 @@ return null; } }, accessControlContext); - } private void execute() { JVM.getJVM().exclude(Thread.currentThread()); try { process(); - } catch (IOException e) { - if (!isClosed()) { - logException(e); - } } catch (Exception e) { - logException(e); + defaultErrorHandler(e); } finally { Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); } } - private void logException(Exception e) { - // FIXME: e.printStackTrace(); for debugging purposes, - // remove before before integration - e.printStackTrace(); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); - } - - public abstract void process() throws IOException; + public abstract void process() throws Exception; protected final void clearLastDispatch() { lastEventDispatch = null; @@ -407,18 +399,33 @@ try { ret[i].offer(event); } catch (Exception e) { - logException(e); + handleError(e); } } } + protected final void handleError(Throwable e) { + StreamConfiguration c = configuration; + if (c.errorActions.length == 0) { + defaultErrorHandler(e); + return; + } + for (Runnable r : c.errorActions) { + r.run(); + } + } + + protected final void defaultErrorHandler(Throwable e) { + e.printStackTrace(); + } + public final void runCloseActions() { Runnable[] cas = configuration.getCloseActions(); for (int i = 0; i < cas.length; i++) { try { cas[i].run(); } catch (Exception e) { - logException(e); + handleError(e); } } } @@ -429,9 +436,10 @@ try { fas[i].run(); } catch (Exception e) { - logException(e); + handleError(e); } } + } // Purpose of synchronizing the following methods is @@ -465,6 +473,10 @@ updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); } + public final synchronized void addErrorAction(Runnable action) { + updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); + } + public final synchronized void setClosed(boolean closed) { updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); } @@ -492,18 +504,17 @@ updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); } - public final void setEndTime(Instant endTime) { - if (configuration.isStarted()) { - throw new IllegalStateException("Stream is already started"); + public final synchronized void setEndTime(Instant endTime) { + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); } - updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); -} - protected boolean updateConfiguration(StreamConfiguration newConfiguration) { - // Changes to the configuration must happen one at a time, so make - // sure that we have the monitor - Thread.holdsLock(this); + if (!Thread.holdsLock(this)) { + throw new InternalError("Modification of configuration without proper lock"); + } if (newConfiguration.hasChanged()) { // Publish objects held by configuration object VarHandle.releaseFence(); @@ -527,18 +538,16 @@ run(); } - private void startInternal(long startNanos) { - synchronized (this) { - if (configuration.isStarted()) { - throw new IllegalStateException("Event stream can only be started once"); - } - StreamConfiguration c = new StreamConfiguration(configuration); - if (active) { - c.setStartNanos(startNanos); - } - c.setStarted(true); - updateConfiguration(c); + private synchronized void startInternal(long startNanos) { + if (configuration.isStarted()) { + throw new IllegalStateException("Event stream can only be started once"); } + StreamConfiguration c = new StreamConfiguration(configuration); + if (active) { + c.setStartNanos(startNanos); + } + c.setStarted(true); + updateConfiguration(c); } public final void awaitTermination(Duration timeout) { diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Aug 30 20:39:38 2019 +0200 @@ -67,7 +67,7 @@ } @Override - public void process() throws IOException { + public void process() throws Exception { final StreamConfiguration c1 = configuration; Path path; boolean validStartTime = active || c1.getStartTime() != null; @@ -272,5 +272,11 @@ eventStream.startAsync(startNanos); } + @Override + public void onError(Consumer action) { + // TODO Auto-generated method stub + + } + } diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Aug 30 20:39:38 2019 +0200 @@ -228,4 +228,10 @@ public void setEndTime(Instant endTime) { eventStream.setEndTime(endTime); } + + @Override + public void onError(Consumer action) { + // TODO Auto-generated method stub + + } } diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Aug 30 20:39:38 2019 +0200 @@ -133,6 +133,21 @@ void onFlush(Runnable action); /** + * Performs an action if an exception occurs when processing the stream. + *

+ * if an error handler has not been added to the stream, an exception stack + * trace is printed to standard error. + *

+ * Adding an error handler overrides the default behavior. If multiple error + * handlers have been added, they will be executed in the order they were + * added. + * + * @param action an action to be performed if an exception occurs, not + * {@code null} + */ + void onError(Consumer action); + + /** * Performs an action when the event stream is closed. *

* If the stream is already closed, the action will be executed immediately diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri Aug 30 20:39:38 2019 +0200 @@ -362,4 +362,9 @@ public void setEndTime(Instant endTime) { stream.setStartTime(endTime); } + + @Override + public void onError(Consumer action) { + stream.onError(action); + } } diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Fri Aug 30 20:39:38 2019 +0200 @@ -52,9 +52,9 @@ private final RecordingInput input; private final long id; private long absoluteEventStart; - private long chunkSize; - private long constantPoolPosition; - private long metadataPosition; + private long chunkSize = 0; + private long constantPoolPosition = 0; + private long metadataPosition = 0; private long durationNanos; private long absoluteChunkEnd; private boolean isFinished; diff -r 74a38c0b5054 -r aa7b1ea52413 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java Thu Aug 29 19:40:37 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java Fri Aug 30 20:39:38 2019 +0200 @@ -41,7 +41,7 @@ Class c = RecordedObject.class; Class.forName(c.getName(), true, c.getClassLoader()); } catch (ClassNotFoundException e) { - new InternalError("shuld not happen"); + throw new InternalError("Should not happen"); } } return INSTANCE;