src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
branchJEP-349-branch
changeset 58129 7b751fe181a5
parent 58020 f082177c5023
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -26,20 +26,16 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
-import java.lang.invoke.VarHandle;
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
-import jdk.jfr.EventType;
 import jdk.jfr.internal.JVM;
 import jdk.jfr.internal.LogLevel;
 import jdk.jfr.internal.LogTag;
@@ -59,40 +55,19 @@
  */
 abstract class AbstractEventStream implements EventStream {
 
-    final static class EventDispatcher {
-        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-        final String eventName;
-        final Consumer<RecordedEvent> action;
-
-        public EventDispatcher(Consumer<RecordedEvent> action) {
-            this(null, action);
-        }
+    static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
 
-        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
-            this.eventName = eventName;
-            this.action = action;
-        }
-
-        public void offer(RecordedEvent event) {
-            action.accept(event);
-        }
-
-        public boolean accepts(EventType eventType) {
-            return (eventName == null || eventType.getName().equals(eventName));
-        }
-    }
-
-    final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
     private final static AtomicLong counter = new AtomicLong(1);
-    private volatile Thread thread;
     private final Object terminated = new Object();
     private final boolean active;
-    private final Runnable flushOperation = () -> runFlushActions();
+    private final Runnable flushOperation = () -> dispatcher().runFlushActions();
     private final AccessControlContext accessControllerContext;
-    private final Object configurationLock = new Object();
+    private final StreamConfiguration configuration = new StreamConfiguration();
 
-    // Modified by updateConfiguration()
-    protected volatile StreamConfiguration configuration = new StreamConfiguration();
+    private volatile Thread thread;
+    private Dispatcher dispatcher;
+
+    private volatile boolean closed;
 
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
         this.accessControllerContext = Objects.requireNonNull(acc);
@@ -108,99 +83,85 @@
     @Override
     abstract public void close();
 
-    // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration so only one
-    // thread at a time can change the configuration.
-    //
-    // The purpose is not to guard the configuration field. A new
-    // configuration is published using updateConfiguration
-    //
+    protected final Dispatcher dispatcher() {
+        if (configuration.hasChanged()) {
+            synchronized (configuration) {
+                dispatcher = new Dispatcher(configuration);
+            }
+        }
+        return dispatcher;
+    }
+
     @Override
     public final void setOrdered(boolean ordered) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
-        }
+        configuration.setOrdered(ordered);
     }
 
     @Override
     public final void setReuse(boolean reuse) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
-        }
+        configuration.setReuse(reuse);
     }
 
     @Override
     public final void setStartTime(Instant startTime) {
         Objects.nonNull(startTime);
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Stream is already started");
             }
             if (startTime.isBefore(Instant.EPOCH)) {
                 startTime = Instant.EPOCH;
             }
-            updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+            configuration.setStartTime(startTime);
         }
     }
 
     @Override
     public final void setEndTime(Instant endTime) {
         Objects.requireNonNull(endTime);
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Stream is already started");
             }
-            updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+            configuration.setEndTime(endTime);
         }
     }
 
     @Override
     public final void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            add(new EventDispatcher(action));
-        }
+        configuration.addEventAction(action);
     }
 
     @Override
     public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            add(new EventDispatcher(eventName, action));
-        }
+        configuration.addEventAction(eventName, action);
     }
 
     @Override
     public final void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
-        }
+        configuration.addFlushAction(action);
     }
 
     @Override
     public final void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
-        }
+        configuration.addCloseAction(action);
     }
 
     @Override
     public final void onError(Consumer<Throwable> action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
-        }
+        configuration.addErrorAction(action);
     }
 
     @Override
     public final boolean remove(Object action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            return updateConfiguration(new StreamConfiguration(configuration).remove(action));
-        }
+        return configuration.remove(action);
     }
 
     @Override
@@ -247,53 +208,12 @@
 
     protected abstract void process() throws Exception;
 
-    protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
-        EventType type = event.getEventType();
-        EventDispatcher[] dispatchers = null;
-        if (type == c.cacheEventType) {
-            dispatchers = c.cacheDispatchers;
-        } else {
-            dispatchers = c.dispatcherLookup.get(type.getId());
-            if (dispatchers == null) {
-                List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : c.getDispatchers()) {
-                    if (e.accepts(type)) {
-                        list.add(e);
-                    }
-                }
-                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                c.dispatcherLookup.put(type.getId(), dispatchers);
-            }
-            c.cacheDispatchers = dispatchers;
-        }
-        for (int i = 0; i < dispatchers.length; i++) {
-            try {
-                dispatchers[i].offer(event);
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
-    protected final void runCloseActions() {
-        Runnable[] closeActions = configuration.getCloseActions();
-        for (int i = 0; i < closeActions.length; i++) {
-            try {
-                closeActions[i].run();
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
     protected final void setClosed(boolean closed) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
-        }
+        this.closed = closed;
     }
 
     protected final boolean isClosed() {
-        return configuration.isClosed();
+        return closed;
     }
 
     protected final void startAsync(long startNanos) {
@@ -313,34 +233,15 @@
         return flushOperation;
     }
 
-    private void add(EventDispatcher e) {
-        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
-    }
-
-    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
-        if (!Thread.holdsLock(configurationLock)) {
-            throw new InternalError("Modification of configuration without proper lock");
-        }
-        if (newConfiguration.hasChanged()) {
-            // Publish objects held by configuration object
-            VarHandle.releaseFence();
-            configuration = newConfiguration;
-            return true;
-        }
-        return false;
-    }
-
     private void startInternal(long startNanos) {
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Event stream can only be started once");
             }
-            StreamConfiguration c = new StreamConfiguration(configuration);
             if (active) {
-                c.setStartNanos(startNanos);
+                configuration.setStartNanos(startNanos);
             }
-            c.setStarted(true);
-            updateConfiguration(c);
+            configuration.setStarted(true);
         }
     }
 
@@ -348,8 +249,14 @@
         JVM.getJVM().exclude(Thread.currentThread());
         try {
             process();
+        } catch (IOException ioe) {
+            // This can happen if a chunk file is removed, or
+            // a file is access that has been closed
+            // This is "normal" behavior for streaming and the
+            // stream will be closed when this happens
         } catch (Exception e) {
-            defaultErrorHandler(e);
+            // TODO: Remove before integrating
+            e.printStackTrace();
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
             try {
@@ -362,30 +269,6 @@
         }
     }
 
-    private void handleError(Throwable e) {
-        Consumer<?>[] consumers = configuration.errorActions;
-        if (consumers.length == 0) {
-            defaultErrorHandler(e);
-            return;
-        }
-        for (int i = 0; i < consumers.length; i++) {
-            @SuppressWarnings("unchecked")
-            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
-            conusmer.accept(e);
-        }
-    }
-
-    private void runFlushActions() {
-        Runnable[] flushActions = configuration.getFlushActions();
-        for (int i = 0; i < flushActions.length; i++) {
-            try {
-                flushActions[i].run();
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
     private void run(AccessControlContext accessControlContext) {
         AccessController.doPrivileged(new PrivilegedAction<Void>() {
             @Override
@@ -400,8 +283,4 @@
         counter.incrementAndGet();
         return "JFR Event Stream " + counter;
     }
-
-    private void defaultErrorHandler(Throwable e) {
-        e.printStackTrace();
-    }
 }
\ No newline at end of file