SetOrdered and setReuse now work on a directory stream JEP-349-branch
authoregahlin
Wed, 26 Jun 2019 16:04:47 +0200
branchJEP-349-branch
changeset 57432 ba454a26d2c1
parent 57428 ef3e241c420f
child 57433 83e4343a6984
SetOrdered and setReuse now work on a directory stream
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Wed Jun 26 16:04:47 2019 +0200
@@ -71,7 +71,7 @@
     }
 
     public ChunkParser(ChunkParser previous) throws IOException {
-        this(new ChunkHeader(previous.input), null, 500);
+        this(new ChunkHeader(previous.input), previous, 500);
      }
 
     private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Wed Jun 26 16:04:47 2019 +0200
@@ -54,6 +54,11 @@
 import jdk.jfr.internal.consumer.EventConsumer;
 import jdk.jfr.internal.consumer.RecordingInput;
 
+/**
+ * Implementation of an {@code EventStream}} that operates against a directory
+ * with chunk files.
+ *
+ */
 final class EventDirectoryStream implements EventStream {
 
     private static final class RepositoryFiles {
@@ -67,7 +72,7 @@
         }
 
         long getTimestamp(Path p) {
-            return  pathLookup.get(p);
+            return pathLookup.get(p);
         }
 
         Path nextPath(long startTimeNanos) {
@@ -168,9 +173,7 @@
         private static final int DEFAULT_ARRAY_SIZE = 10_000;
         private final RepositoryFiles repositoryFiles;
         private ChunkParser chunkParser;
-        private boolean reuse = true;
         private RecordedEvent[] sortedList;
-        private boolean ordered = true;
 
         public ParserConsumer(AccessControlContext acc, Path p) throws IOException {
             super(acc);
@@ -200,6 +203,9 @@
                     }
 
                     path = repositoryFiles.nextPath(startNanos);
+                    if (path == null) {
+                        return; // stream closed
+                    }
                     startNanos = repositoryFiles.getTimestamp(path) + 1;
                     input.setFile(path);
                     chunkParser = chunkParser.newChunkParser();
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Wed Jun 26 16:04:47 2019 +0200
@@ -53,7 +53,6 @@
         private RecordedEvent[] sortedList;
         private boolean ordered;
 
-
         public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
             super(acc);
             this.input = input;
@@ -68,8 +67,8 @@
                 chunkParser.setReuse(reuse);
                 chunkParser.setOrdered(ordered);
                 chunkParser.resetEventCache();
+                chunkParser.setParserFilter(eventFilter);
                 chunkParser.updateEventParsers();
-
                 if (ordered) {
                     processOrdered();
                 } else {
@@ -78,6 +77,7 @@
                 if (chunkParser.isLastChunk()) {
                     return;
                 }
+                runFlushActions();
                 chunkParser = chunkParser.nextChunkParser();
             }
         }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Wed Jun 26 16:04:47 2019 +0200
@@ -331,7 +331,7 @@
 
     @Override
     public void setReuse(boolean reuse) {
-        throw new UnsupportedOperationException("Unordered not implemented yet");
+        stream.setReuse(reuse);
     }
 
     @Override
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Wed Jun 26 16:04:47 2019 +0200
@@ -78,6 +78,8 @@
     private final static VarHandle dispatcherHandle;
     private final static VarHandle flushActionsHandle;
     private final static VarHandle closeActionsHandle;
+    private final static VarHandle orderedHandle;
+    private final static VarHandle reuseHandle;
     static {
         try {
             MethodHandles.Lookup l = MethodHandles.lookup();
@@ -86,6 +88,9 @@
             dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
             flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
             closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
+            orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class);
+            reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class);
+
         } catch (ReflectiveOperationException e) {
             throw new InternalError(e);
         }
@@ -101,6 +106,10 @@
     // set by VarHandle
     private Runnable[] closeActions = new Runnable[0];
 
+    protected boolean ordered = true;
+
+    protected boolean reuse = true;
+
     protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
 
     private final AccessControlContext accessControlContext;
@@ -329,11 +338,11 @@
     abstract public void close();
 
     public void setReuse(boolean reuse) {
-
+        reuseHandle.setVolatile(this, reuse);
     }
 
     public void setOrdered(boolean ordered) {
-
+        orderedHandle.setVolatile(this, ordered);
     }
 
 }
\ No newline at end of file
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java	Tue Jun 25 14:02:06 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java	Wed Jun 26 16:04:47 2019 +0200
@@ -92,7 +92,7 @@
                 es.onEvent(e -> {
                     Instant endTime = e.getEndTime();
                     if (endTime.isBefore(timestamp.get())) {
-                        throw new Error("Events are not ordered! Reues = " + reuse);
+                        throw new Error("Events are not ordered! Reuse = " + reuse);
                     }
                     timestamp.set(endTime);
                 });
@@ -118,7 +118,7 @@
                 });
                 es.start();
                 if (!unoreded.get()) {
-                    throw new Exception("Expected at least some events to be out of order! Reues = " + reuse);
+                    throw new Exception("Expected at least some events to be out of order! Reuse = " + reuse);
                 }
             }
         }