src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
branchJEP-349-branch
changeset 57604 838f9a7635b6
parent 57452 6fabe73e5d9a
child 57628 f5f590eaecf5
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Wed Jul 31 14:07:44 2019 +0200
@@ -45,43 +45,52 @@
  */
 final class EventDirectoryStream implements EventStream {
 
-    static final class DirectoryConsumer extends EventConsumer {
+    static final class DirectoryStream extends AbstractEventStream {
 
         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
         private static final int DEFAULT_ARRAY_SIZE = 10_000;
+
         private final RepositoryFiles repositoryFiles;
+
         private ChunkParser chunkParser;
         private RecordedEvent[] sortedList;
         protected long chunkStartNanos;
 
-        public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
+        public DirectoryStream(AccessControlContext acc, Path p) throws IOException {
             super(acc);
             repositoryFiles = new RepositoryFiles(p);
         }
 
         @Override
         public void process() throws IOException {
-            chunkStartNanos = startNanos;
+            StreamConfiguration c1 = configuration;
+            chunkStartNanos = c1.getStartNanos();
             Path path;
-            if (startTime == EventConsumer.NEXT_EVENT) {
+            if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
                 // TODO: Need to skip forward to the next event
                 // For now, use the last chunk.
                 path = repositoryFiles.lastPath();
             } else {
                 path = repositoryFiles.nextPath(chunkStartNanos);
             }
+            if (path == null) { // closed
+                return;
+            }
             chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
             try (RecordingInput input = new RecordingInput(path.toFile())) {
-                chunkParser = new ChunkParser(input, this.reuse);
+                chunkParser = new ChunkParser(input, c1.getReuse());
                 while (!isClosed()) {
                     boolean awaitnewEvent = false;
                     while (!isClosed() && !chunkParser.isChunkFinished()) {
-                        chunkParser.setReuse(this.reuse);
-                        chunkParser.setOrdered(this.ordered);
-                        chunkParser.setFirstNanos(startNanos);
+                        final StreamConfiguration c2 = configuration;
+                        boolean ordered = c2.getOrdered();
+                        chunkParser.setReuse(c2.getReuse());
+                        chunkParser.setOrdered(ordered);
+                        chunkParser.setFirstNanos(c2.getStartNanos());
                         chunkParser.resetEventCache();
-                        chunkParser.setParserFilter(this.eventFilter);
+                        chunkParser.setParserFilter(c2.getFilter());
                         chunkParser.updateEventParsers();
+                        clearLastDispatch();
                         if (ordered) {
                             awaitnewEvent = processOrdered(awaitnewEvent);
                         } else {
@@ -101,6 +110,7 @@
             }
         }
 
+
         private boolean processOrdered(boolean awaitNewEvents) throws IOException {
             if (sortedList == null) {
                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -139,39 +149,36 @@
             while (true) {
                 RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
                 if (e == null) {
-                    awaitNewEvents = true;
-                    break;
+                    return true;
                 } else {
                     dispatch(e);
                 }
             }
-            return awaitNewEvents;
         }
 
         @Override
         public void close() {
+            setClosed(true);
             repositoryFiles.close();
         }
     }
 
-    private final EventConsumer eventConsumer;
+    private final AbstractEventStream eventStream;
 
     public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
-        eventConsumer = new DirectoryConsumer(acc, p);
-        eventConsumer.startTime = startTime;
+        eventStream = new DirectoryStream(acc, p);
+        eventStream.setStartTime(startTime);
     }
 
+    @Override
     public void close() {
-        eventConsumer.close();
+        eventStream.close();
     }
 
+    @Override
     public void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        eventConsumer.onFlush(action);
-    }
-
-    void start(long startNanos) {
-        eventConsumer.start(startNanos);
+        eventStream.onFlush(action);
     }
 
     @Override
@@ -184,58 +191,62 @@
         startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
     }
 
-    void startAsync(long startNanos) {
-        eventConsumer.startAsync(startNanos);
-    }
-
     @Override
     public void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        eventConsumer.onEvent(action);
+        eventStream.onEvent(action);
     }
 
     @Override
     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        eventConsumer.onEvent(eventName, action);
+        eventStream.onEvent(eventName, action);
     }
 
     @Override
     public void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        eventConsumer.addCloseAction(action);
+        eventStream.addCloseAction(action);
     }
 
     @Override
     public boolean remove(Object action) {
         Objects.requireNonNull(action);
-        return eventConsumer.remove(action);
+        return eventStream.remove(action);
     }
 
     @Override
     public void awaitTermination(Duration timeout) {
         Objects.requireNonNull(timeout);
-        eventConsumer.awaitTermination(timeout);
+        eventStream.awaitTermination(timeout);
     }
 
     @Override
     public void awaitTermination() {
-        eventConsumer.awaitTermination(Duration.ofMillis(0));
+        eventStream.awaitTermination(Duration.ofMillis(0));
     }
 
     @Override
     public void setReuse(boolean reuse) {
-        eventConsumer.setReuse(reuse);
+        eventStream.setReuse(reuse);
     }
 
     @Override
     public void setOrdered(boolean ordered) {
-        eventConsumer.setOrdered(ordered);
+        eventStream.setOrdered(ordered);
     }
 
     @Override
     public void setStartTime(Instant startTime) {
-        eventConsumer.setStartTime(startTime);
+        eventStream.setStartTime(startTime);
+    }
+
+    public void start(long startNanos) {
+        eventStream.start(startNanos);
+    }
+
+    public void startAsync(long startNanos) {
+        eventStream.startAsync(startNanos);
     }
 }