src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
branchJEP-349-branch
changeset 58020 f082177c5023
parent 57985 be121cbf3284
child 58129 7b751fe181a5
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -33,10 +33,10 @@
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import jdk.jfr.EventType;
@@ -44,10 +44,7 @@
 import jdk.jfr.internal.LogLevel;
 import jdk.jfr.internal.LogTag;
 import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.LongMap;
 import jdk.jfr.internal.SecuritySupport;
-import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
 
 /*
  * Purpose of this class is to simplify the implementation of
@@ -62,254 +59,10 @@
  */
 abstract class AbstractEventStream implements EventStream {
 
-    protected static final class StreamConfiguration {
-        private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
-        private Consumer<?>[] errorActions = new Consumer<?>[0];
-        private Runnable[] flushActions = NO_ACTIONS;
-        private Runnable[] closeActions = NO_ACTIONS;
-        private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
-        private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
-        private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
-        private boolean changedConfiguration = false;
-        private boolean closed = false;
-        private boolean reuse = true;
-        private boolean ordered = true;
-        private Instant startTime = null;
-        private Instant endTime = null;
-        private boolean started = false;
-        private long startNanos = 0;
-        private long endNanos = Long.MAX_VALUE;
-
-        public StreamConfiguration(StreamConfiguration configuration) {
-            this.flushActions = configuration.flushActions;
-            this.closeActions = configuration.closeActions;
-            this.errorActions = configuration.errorActions;
-            this.dispatchers = configuration.dispatchers;
-            this.eventFilter = configuration.eventFilter;
-            this.closed = configuration.closed;
-            this.reuse = configuration.reuse;
-            this.ordered = configuration.ordered;
-            this.startTime = configuration.startTime;
-            this.endTime = configuration.endTime;
-            this.started = configuration.started;
-            this.startNanos = configuration.startNanos;
-            this.endNanos = configuration.endNanos;
-            this.dispatcherLookup = configuration.dispatcherLookup;
-        }
-
-        public StreamConfiguration() {
-        }
-
-        public StreamConfiguration remove(Object action) {
-            flushActions = remove(flushActions, action);
-            closeActions = remove(closeActions, action);
-            dispatchers = removeDispatch(dispatchers, action);
-            return this;
-        }
-
-        public StreamConfiguration addDispatcher(EventDispatcher e) {
-            dispatchers = add(dispatchers, e);
-            eventFilter = buildFilter(dispatchers);
-            dispatcherLookup = new LongMap<>();
-            return this;
-        }
-
-        public StreamConfiguration addFlushAction(Runnable action) {
-            flushActions = add(flushActions, action);
-            return this;
-        }
-
-        public StreamConfiguration addCloseAction(Runnable action) {
-            closeActions = add(closeActions, action);
-            return this;
-        }
-
-        public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
-            errorActions = add(errorActions, action);
-            return this;
-        }
-
-        public StreamConfiguration setClosed(boolean closed) {
-            this.closed = closed;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public boolean isClosed() {
-            return closed;
-        }
-
-        public Runnable[] getCloseActions() {
-            return closeActions;
-        }
-
-        public Runnable[] getFlushActions() {
-            return flushActions;
-        }
-
-        private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
-            List<EventDispatcher> list = new ArrayList<>(array.length);
-            boolean modified = false;
-            for (int i = 0; i < array.length; i++) {
-                if (array[i].action != action) {
-                    list.add(array[i]);
-                } else {
-                    modified = true;
-                }
-            }
-            EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
-            if (modified) {
-                eventFilter = buildFilter(result);
-                dispatcherLookup = new LongMap<>();
-                changedConfiguration = true;
-            }
-            return result;
-        }
-
-        private <T> T[] remove(T[] array, Object action) {
-            List<T> list = new ArrayList<>(array.length);
-            for (int i = 0; i < array.length; i++) {
-                if (array[i] != action) {
-                    list.add(array[i]);
-                } else {
-                    changedConfiguration = true;
-                }
-            }
-            return list.toArray(array);
-        }
-
-        private <T> T[] add(T[] array, T object) {
-            List<T> list = new ArrayList<>(Arrays.asList(array));
-            list.add(object);
-            changedConfiguration = true;
-            return list.toArray(array);
-        }
-
-        private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
-            InternalEventFilter ef = new InternalEventFilter();
-            for (EventDispatcher ed : dispatchers) {
-                String name = ed.eventName;
-                if (name == null) {
-                    return InternalEventFilter.ACCEPT_ALL;
-                }
-                ef.setThreshold(name, 0);
-            }
-            return ef;
-        }
-
-        public StreamConfiguration setReuse(boolean reuse) {
-            this.reuse = reuse;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setOrdered(boolean ordered) {
-            this.ordered = ordered;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setEndTime(Instant endTime) {
-            this.endTime = endTime;
-            this.endNanos = Utils.timeToNanos(endTime);
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setStartTime(Instant startTime) {
-            this.startTime = startTime;
-            this.startNanos = Utils.timeToNanos(startTime);
-            changedConfiguration = true;
-            return this;
-        }
-
-        public Instant getStartTime() {
-            return startTime;
-        }
-
-        public Object getEndTime() {
-            return endTime;
-        }
-
-        public boolean isStarted() {
-            return started;
-        }
-
-        public StreamConfiguration setStartNanos(long startNanos) {
-            this.startNanos = startNanos;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public void setStarted(boolean started) {
-            this.started = started;
-            changedConfiguration = true;
-        }
-
-        public boolean hasChanged() {
-            return changedConfiguration;
-        }
-
-        public boolean getReuse() {
-            return reuse;
-        }
-
-        public boolean getOrdered() {
-            return ordered;
-        }
-
-        public InternalEventFilter getFiler() {
-            return eventFilter;
-        }
-
-        public long getStartNanos() {
-            return startNanos;
-        }
-
-        public long getEndNanos() {
-            return endNanos;
-        }
-
-        public InternalEventFilter getFilter() {
-            return eventFilter;
-        }
-
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            for (Runnable flush : flushActions) {
-                sb.append("Flush Action: ").append(flush).append("\n");
-            }
-            for (Runnable close : closeActions) {
-                sb.append("Close Action: " + close + "\n");
-            }
-            for (Consumer<?> error : errorActions) {
-                sb.append("Error Action: " + error + "\n");
-            }
-            for (EventDispatcher dispatcher : dispatchers) {
-                sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
-            }
-            sb.append("Closed: ").append(closed).append("\n");
-            sb.append("Reuse: ").append(reuse).append("\n");
-            sb.append("Ordered: ").append(ordered).append("\n");
-            sb.append("Started: ").append(started).append("\n");
-            sb.append("Start Time: ").append(startTime).append("\n");
-            sb.append("Start Nanos: ").append(startNanos).append("\n");
-            sb.append("End Time: ").append(endTime).append("\n");
-            sb.append("End Nanos: ").append(endNanos).append("\n");
-            return sb.toString();
-        }
-
-        private EventDispatcher[] getDispatchers() {
-            return dispatchers;
-        }
-    }
-
-    private final static class EventDispatcher {
+    final static class EventDispatcher {
         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-        final private String eventName;
-        final private Consumer<RecordedEvent> action;
+        final String eventName;
+        final Consumer<RecordedEvent> action;
 
         public EventDispatcher(Consumer<RecordedEvent> action) {
             this(null, action);
@@ -330,8 +83,9 @@
     }
 
     final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
-
-    private final Thread thread;
+    private final static AtomicLong counter = new AtomicLong(1);
+    private volatile Thread thread;
+    private final Object terminated = new Object();
     private final boolean active;
     private final Runnable flushOperation = () -> runFlushActions();
     private final AccessControlContext accessControllerContext;
@@ -340,14 +94,9 @@
     // Modified by updateConfiguration()
     protected volatile StreamConfiguration configuration = new StreamConfiguration();
 
-    // Cache the last event type and dispatch.
-    private EventType lastEventType;
-    private EventDispatcher[] lastEventDispatch;
-
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
         this.accessControllerContext = Objects.requireNonNull(acc);
         this.active = active;
-        this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
     }
 
     @Override
@@ -360,7 +109,7 @@
     abstract public void close();
 
     // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration, so only one
+    // to serialize changes to the configuration so only one
     // thread at a time can change the configuration.
     //
     // The purpose is not to guard the configuration field. A new
@@ -455,47 +204,67 @@
     }
 
     @Override
-    public final void awaitTermination() {
+    public final void awaitTermination() throws InterruptedException {
         awaitTermination(Duration.ofMillis(0));
     }
 
     @Override
-    public final void awaitTermination(Duration timeout) {
+    public final void awaitTermination(Duration timeout) throws InterruptedException {
         Objects.requireNonNull(timeout);
-        if (thread != Thread.currentThread()) {
-            try {
-                thread.join(timeout.toMillis());
-            } catch (InterruptedException e) {
-                // ignore
+        if (timeout.isNegative()) {
+            throw new IllegalArgumentException("timeout value is negative");
+        }
+
+        long base = System.currentTimeMillis();
+        long now = 0;
+
+        long millis;
+        try {
+            millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+        } catch (ArithmeticException a) {
+            millis = Long.MAX_VALUE;
+        }
+        int nanos = timeout.toNanosPart();
+        if (nanos == 0 && millis == 0) {
+            synchronized (terminated) {
+                while (!isClosed()) {
+                    terminated.wait(0);
+                }
+            }
+        } else {
+            while (!isClosed()) {
+                long delay = millis - now;
+                if (delay <= 0) {
+                    break;
+                }
+                synchronized (terminated) {
+                    terminated.wait(delay, nanos);
+                }
+                now = System.currentTimeMillis() - base;
             }
         }
     }
 
     protected abstract void process() throws Exception;
 
-    protected final void clearLastDispatch() {
-        lastEventDispatch = null;
-        lastEventType = null;
-    }
-
-    protected final void dispatch(RecordedEvent event) {
+    protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
         EventType type = event.getEventType();
         EventDispatcher[] dispatchers = null;
-        if (type == lastEventType) {
-            dispatchers = lastEventDispatch;
+        if (type == c.cacheEventType) {
+            dispatchers = c.cacheDispatchers;
         } else {
-            dispatchers = configuration.dispatcherLookup.get(type.getId());
+            dispatchers = c.dispatcherLookup.get(type.getId());
             if (dispatchers == null) {
                 List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : configuration.getDispatchers()) {
+                for (EventDispatcher e : c.getDispatchers()) {
                     if (e.accepts(type)) {
                         list.add(e);
                     }
                 }
                 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                configuration.dispatcherLookup.put(type.getId(), dispatchers);
+                c.dispatcherLookup.put(type.getId(), dispatchers);
             }
-            lastEventDispatch = dispatchers;
+            c.cacheDispatchers = dispatchers;
         }
         for (int i = 0; i < dispatchers.length; i++) {
             try {
@@ -529,11 +298,14 @@
 
     protected final void startAsync(long startNanos) {
         startInternal(startNanos);
+        Runnable r = () -> run(accessControllerContext);
+        thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
         thread.start();
     }
 
     protected final void start(long startNanos) {
         startInternal(startNanos);
+        thread = Thread.currentThread();
         run(accessControllerContext);
     }
 
@@ -580,6 +352,13 @@
             defaultErrorHandler(e);
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+            try {
+                close();
+            } finally {
+                synchronized (terminated) {
+                    terminated.notifyAll();
+                }
+            }
         }
     }
 
@@ -591,15 +370,11 @@
         }
         for (int i = 0; i < consumers.length; i++) {
             @SuppressWarnings("unchecked")
-            Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
-            c.accept(e);
+            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+            conusmer.accept(e);
         }
     }
 
-    private void defaultErrorHandler(Throwable e) {
-        e.printStackTrace();
-    }
-
     private void runFlushActions() {
         Runnable[] flushActions = configuration.getFlushActions();
         for (int i = 0; i < flushActions.length; i++) {
@@ -611,13 +386,22 @@
         }
     }
 
-    private void run(AccessControlContext acc) {
+    private void run(AccessControlContext accessControlContext) {
         AccessController.doPrivileged(new PrivilegedAction<Void>() {
             @Override
             public Void run() {
                 execute();
                 return null;
             }
-        }, acc);
+        }, accessControlContext);
+    }
+
+    private String nextThreadName() {
+        counter.incrementAndGet();
+        return "JFR Event Stream " + counter;
+    }
+
+    private void defaultErrorHandler(Throwable e) {
+        e.printStackTrace();
     }
 }
\ No newline at end of file