src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57372 50ca040843ea
parent 57361 53dccc90a5be
child 57373 400db63e4937
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri May 24 19:39:31 2019 +0200
@@ -32,141 +32,144 @@
 import java.util.Objects;
 import java.util.function.Consumer;
 
-import jdk.jfr.EventType;
+import jdk.jfr.internal.consumer.EventConsumer;
 
 final class EventDirectoryStream implements EventStream {
 
-    public final static class EventConsumer {
-        final private String eventName;
-        final Consumer<RecordedEvent> action;
+    private static class EventRunner extends EventConsumer {
+        private EventSetLocation location;
+        private EventSet eventSet;
+        private int eventSetIndex;
+        private int eventArrayIndex;
+        private RecordedEvent[] currentEventArray = new RecordedEvent[0];
 
-        EventConsumer(String eventName, Consumer<RecordedEvent> eventConsumer) {
-            this.eventName = eventName;
-            this.action = eventConsumer;
+        public EventRunner(AccessControlContext acc) throws IOException {
+            super(acc);
         }
 
-        public void offer(RecordedEvent event) {
-            action.accept(event);
+        public void process() throws Exception, IOException {
+            this.location = EventSetLocation.current();
+            this.eventSet = location.acquire(startNanos, null); // use timestamp
+                                                                // from
+            if (eventSet == null) {
+                return;
+            }
+            while (!isClosed()) {
+                processSegment();
+                runFlushActions();
+                do {
+                    if (isClosed()) {
+                        return;
+                    }
+                    currentEventArray = eventSet.readEvents(eventSetIndex);
+                    if (currentEventArray == EventSet.END_OF_SET) {
+                        eventSet = eventSet.next(eventFilter);
+                        if (eventSet == null || isClosed()) {
+                            return;
+                        }
+                        eventSetIndex = 0;
+                        continue;
+                    }
+                    if (currentEventArray == null) {
+                        return; // no more events
+                    }
+                    eventSetIndex++;
+                } while (currentEventArray.length == 0);
+                eventArrayIndex = 0;
+            }
         }
 
-        public boolean accepts(EventType eventType) {
-            return (eventName == null || eventType.getName().equals(eventName));
+        private void processSegment() {
+            while (eventArrayIndex < currentEventArray.length) {
+                RecordedEvent e = currentEventArray[eventArrayIndex++];
+                if (e == null) {
+                    return;
+                }
+                dispatch(e);
+            }
+        }
+
+        public void close() {
+            setClosed(true);
+            // TODO: Data races here, must fix
+            synchronized (this) {
+                if (eventSet != null) {
+                    eventSet.release(null);
+                }
+                if (location != null) {
+                    location.release();
+                }
+            }
+            runCloseActions();
         }
     }
 
-    private final EventRunner eventRunner;
-    private Thread thread;
-    private boolean started;
+    private final EventRunner eventConsumer;
 
     public EventDirectoryStream(AccessControlContext acc) throws IOException {
-        eventRunner = new EventRunner(acc);
+        eventConsumer = new EventRunner(acc);
     }
 
     public void close() {
-        synchronized (eventRunner) {
-            eventRunner.close();
-        }
+        eventConsumer.close();
     }
 
-    public synchronized void onFlush(Runnable action) {
+    public void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            this.eventRunner.addFlush(action);
-        }
+        eventConsumer.onFlush(action);
     }
 
     void start(long startNanos) {
-        synchronized (eventRunner) {
-            if (started) {
-                throw new IllegalStateException("Event stream can only be started once");
-            }
-            started = true;
-            eventRunner.setStartNanos(startNanos);
-        }
-        eventRunner.run();
+        eventConsumer.start(startNanos);
     }
 
     @Override
     public void start() {
-        start(Instant.now().toEpochMilli() * 1000*1000L);
+        start(Instant.now().toEpochMilli() * 1000 * 1000L);
     }
 
     @Override
     public void startAsync() {
-        startAsync(Instant.now().toEpochMilli() * 1000*1000L);
+        startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
     }
 
     void startAsync(long startNanos) {
-        synchronized (eventRunner) {
-            eventRunner.setStartNanos(startNanos);
-            thread = new Thread(eventRunner);
-            thread.setDaemon(true);
-            thread.start();
-        }
+        eventConsumer.startAsync(startNanos);
     }
 
-    public void addEventConsumer(EventConsumer action) {
-        Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            eventRunner.add(action);
-        }
-    }
-
-
-
     @Override
     public void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            eventRunner.add(new EventConsumer(null, action));
-        }
+        eventConsumer.onEvent(action);
     }
 
     @Override
     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            eventRunner.add(new EventConsumer(eventName, action));
-        }
+        eventConsumer.onEvent(eventName, action);
     }
 
     @Override
     public void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            eventRunner.addCloseAction(action);
-        }
+        eventConsumer.addCloseAction(action);
     }
 
     @Override
     public boolean remove(Object action) {
         Objects.requireNonNull(action);
-        synchronized (eventRunner) {
-            return eventRunner.remove(action);
-        }
+        return eventConsumer.remove(action);
     }
 
     @Override
     public void awaitTermination(Duration timeout) {
         Objects.requireNonNull(timeout);
-        Thread t = null;
-        synchronized (eventRunner) {
-            t = thread;
-        }
-        if (t != null && t != Thread.currentThread()) {
-            try {
-                t.join(timeout.toMillis());
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
+        eventConsumer.awaitTermination(timeout);
     }
 
     @Override
     public void awaitTermination() {
-        awaitTermination(Duration.ofMillis(0));
+        eventConsumer.awaitTermination(Duration.ofMillis(0));
     }
-
-
 }