Add onError handler + various cleanup JEP-349-branch
authoregahlin
Fri, 30 Aug 2019 20:39:38 +0200
branchJEP-349-branch
changeset 57971 aa7b1ea52413
parent 57949 74a38c0b5054
child 57983 a57907813a83
Add onError handler + various cleanup
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Aug 30 20:39:38 2019 +0200
@@ -66,8 +66,12 @@
 
         private Runnable[] flushActions = NO_ACTIONS;
         private Runnable[] closeActions = NO_ACTIONS;
+        private Runnable[] errorActions = NO_ACTIONS;
+
         private EventDispatcher[] dispatchers = NO_DISPATCHERS;
         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+        private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+        private boolean changedConfiguration = false;
         private boolean closed = false;
         private boolean reuse = true;
         private boolean ordered = true;
@@ -76,8 +80,6 @@
         private boolean started = false;
         private long startNanos = 0;
         private long endNanos = Long.MAX_VALUE;
-        private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-        private boolean changed = false;
 
         public StreamConfiguration(StreamConfiguration configuration) {
             this.flushActions = configuration.flushActions;
@@ -122,9 +124,14 @@
             return this;
         }
 
+        public StreamConfiguration addErrorAction(Runnable action) {
+            errorActions = add(errorActions, action);
+            return this;
+        }
+
         final public StreamConfiguration setClosed(boolean closed) {
             this.closed = closed;
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
 
@@ -154,7 +161,7 @@
             if (modified) {
                 eventFilter = buildFilter(result);
                 dispatcherLookup = new LongMap<>();
-                changed = true;
+                changedConfiguration = true;
             }
             return result;
         }
@@ -165,7 +172,7 @@
                 if (array[i] != action) {
                     list.add(array[i]);
                 } else {
-                    changed = true;
+                    changedConfiguration = true;
                 }
             }
             return list.toArray(array);
@@ -174,7 +181,7 @@
         private <T> T[] add(T[] array, T object) {
             List<T> list = new ArrayList<>(Arrays.asList(array));
             list.add(object);
-            changed = true;
+            changedConfiguration = true;
             return list.toArray(array);
         }
 
@@ -192,26 +199,27 @@
 
         final public StreamConfiguration setReuse(boolean reuse) {
             this.reuse = reuse;
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
 
         final public StreamConfiguration setOrdered(boolean ordered) {
             this.ordered = ordered;
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
+
         public StreamConfiguration setEndTime(Instant endTime) {
             this.endTime = endTime;
             this.endNanos = Utils.timeToNanos(endTime);
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
 
         final public StreamConfiguration setStartTime(Instant startTime) {
             this.startTime = startTime;
             this.startNanos = Utils.timeToNanos(startTime);
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
 
@@ -229,17 +237,17 @@
 
         final public StreamConfiguration setStartNanos(long startNanos) {
             this.startNanos = startNanos;
-            changed = true;
+            changedConfiguration = true;
             return this;
         }
 
         final public void setStarted(boolean started) {
             this.started = started;
-            changed = true;
+            changedConfiguration = true;
         }
 
         final public boolean hasChanged() {
-            return changed;
+            return changedConfiguration;
         }
 
         final public boolean getReuse() {
@@ -291,10 +299,6 @@
         private EventDispatcher[] getDispatchers() {
             return dispatchers;
         }
-
-
-
-
     }
 
     final static class EventDispatcher {
@@ -329,7 +333,7 @@
     private final boolean active;
     protected final Runnable flushOperation = () -> runFlushActions();
 
-    // Updated by updateConfiguration()
+    // Modified by updateConfiguration()
     protected volatile StreamConfiguration configuration = new StreamConfiguration();
 
     // Cache the last event type and dispatch.
@@ -352,32 +356,20 @@
                 return null;
             }
         }, accessControlContext);
-
     }
 
     private void execute() {
         JVM.getJVM().exclude(Thread.currentThread());
         try {
             process();
-        } catch (IOException e) {
-            if (!isClosed()) {
-                logException(e);
-            }
         } catch (Exception e) {
-            logException(e);
+            defaultErrorHandler(e);
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
         }
     }
 
-    private void logException(Exception e) {
-        // FIXME: e.printStackTrace(); for debugging purposes,
-        // remove before before integration
-        e.printStackTrace();
-        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
-    }
-
-    public abstract void process() throws IOException;
+    public abstract void process() throws Exception;
 
     protected final void clearLastDispatch() {
         lastEventDispatch = null;
@@ -407,18 +399,33 @@
             try {
                 ret[i].offer(event);
             } catch (Exception e) {
-                logException(e);
+                handleError(e);
             }
         }
     }
 
+    protected final void handleError(Throwable e) {
+        StreamConfiguration c = configuration;
+        if (c.errorActions.length == 0) {
+            defaultErrorHandler(e);
+            return;
+        }
+        for (Runnable r : c.errorActions) {
+            r.run();
+        }
+    }
+
+    protected final void defaultErrorHandler(Throwable e) {
+        e.printStackTrace();
+    }
+
     public final void runCloseActions() {
         Runnable[] cas = configuration.getCloseActions();
         for (int i = 0; i < cas.length; i++) {
             try {
                 cas[i].run();
             } catch (Exception e) {
-                logException(e);
+                handleError(e);
             }
         }
     }
@@ -429,9 +436,10 @@
             try {
                 fas[i].run();
             } catch (Exception e) {
-                logException(e);
+                handleError(e);
             }
         }
+
     }
 
     // Purpose of synchronizing the following methods is
@@ -465,6 +473,10 @@
         updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
     }
 
+    public final synchronized void addErrorAction(Runnable action) {
+        updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
+    }
+
     public final synchronized void setClosed(boolean closed) {
         updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
     }
@@ -492,18 +504,17 @@
         updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
     }
 
-    public final void setEndTime(Instant endTime) {
-    if (configuration.isStarted()) {
-        throw new IllegalStateException("Stream is already started");
+    public final synchronized void setEndTime(Instant endTime) {
+        if (configuration.isStarted()) {
+            throw new IllegalStateException("Stream is already started");
+        }
+        updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
     }
-    updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
-}
-
 
     protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
-        // Changes to the configuration must happen one at a time, so make
-        // sure that we have the monitor
-        Thread.holdsLock(this);
+        if (!Thread.holdsLock(this)) {
+            throw new InternalError("Modification of configuration without proper lock");
+        }
         if (newConfiguration.hasChanged()) {
             // Publish objects held by configuration object
             VarHandle.releaseFence();
@@ -527,18 +538,16 @@
         run();
     }
 
-    private void startInternal(long startNanos) {
-        synchronized (this) {
-            if (configuration.isStarted()) {
-                throw new IllegalStateException("Event stream can only be started once");
-            }
-            StreamConfiguration c = new StreamConfiguration(configuration);
-            if (active) {
-                c.setStartNanos(startNanos);
-            }
-            c.setStarted(true);
-            updateConfiguration(c);
+    private synchronized void startInternal(long startNanos) {
+        if (configuration.isStarted()) {
+            throw new IllegalStateException("Event stream can only be started once");
         }
+        StreamConfiguration c = new StreamConfiguration(configuration);
+        if (active) {
+            c.setStartNanos(startNanos);
+        }
+        c.setStarted(true);
+        updateConfiguration(c);
     }
 
     public final void awaitTermination(Duration timeout) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Aug 30 20:39:38 2019 +0200
@@ -67,7 +67,7 @@
         }
 
         @Override
-        public void process() throws IOException {
+        public void process() throws Exception {
             final StreamConfiguration c1 = configuration;
             Path path;
             boolean validStartTime = active || c1.getStartTime() != null;
@@ -272,5 +272,11 @@
         eventStream.startAsync(startNanos);
     }
 
+    @Override
+    public void onError(Consumer<Throwable> action) {
+        // TODO Auto-generated method stub
+
+    }
+
 
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Aug 30 20:39:38 2019 +0200
@@ -228,4 +228,10 @@
     public void setEndTime(Instant endTime) {
         eventStream.setEndTime(endTime);
     }
+
+    @Override
+    public void onError(Consumer<Throwable> action) {
+        // TODO Auto-generated method stub
+
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Fri Aug 30 20:39:38 2019 +0200
@@ -133,6 +133,21 @@
     void onFlush(Runnable action);
 
     /**
+     * Performs an action if an exception occurs when processing the stream.
+     * <p>
+     * if an error handler has not been added to the stream, an exception stack
+     * trace is printed to standard error.
+     * <p>
+     * Adding an error handler overrides the default behavior. If multiple error
+     * handlers have been added, they will be executed in the order they were
+     * added.
+     *
+     * @param action an action to be performed if an exception occurs, not
+     *        {@code null}
+     */
+    void onError(Consumer<Throwable> action);
+
+    /**
      * Performs an action when the event stream is closed.
      * <p>
      * If the stream is already closed, the action will be executed immediately
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Fri Aug 30 20:39:38 2019 +0200
@@ -362,4 +362,9 @@
     public void setEndTime(Instant endTime) {
         stream.setStartTime(endTime);
     }
+
+    @Override
+    public void onError(Consumer<Throwable> action) {
+        stream.onError(action);
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java	Fri Aug 30 20:39:38 2019 +0200
@@ -52,9 +52,9 @@
     private final RecordingInput input;
     private final long id;
     private long absoluteEventStart;
-    private long chunkSize;
-    private long constantPoolPosition;
-    private long metadataPosition;
+    private long chunkSize = 0;
+    private long constantPoolPosition = 0;
+    private long metadataPosition = 0;
     private long durationNanos;
     private long absoluteChunkEnd;
     private boolean isFinished;
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java	Thu Aug 29 19:40:37 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInternals.java	Fri Aug 30 20:39:38 2019 +0200
@@ -41,7 +41,7 @@
                 Class<?> c = RecordedObject.class;
                 Class.forName(c.getName(), true, c.getClassLoader());
             } catch (ClassNotFoundException e) {
-                new InternalError("shuld not happen");
+                throw new InternalError("Should not happen");
             }
         }
         return INSTANCE;