Make setReuse and setOrdered work across chunk boundaries. Improved unit tests JEP-349-branch
authoregahlin
Fri, 31 May 2019 20:44:28 +0200
branchJEP-349-branch
changeset 57385 7d9d4f629f6e
parent 57381 ce265e404c64
child 57386 acdd0dbe37ee
Make setReuse and setOrdered work across chunk boundaries. Improved unit tests
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java
test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri May 31 20:44:28 2019 +0200
@@ -82,6 +82,9 @@
             this.constantLookups = previous.constantLookups;
             this.previousMetadata = previous.metadata;
             this.pollInterval = previous.pollInterval;
+            this.ordered = previous.ordered;
+            this.reuse = previous.reuse;
+            this.eventFilter = previous.eventFilter;
         }
         this.metadata = header.readMetadata(previousMetadata);
         this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
@@ -89,11 +92,11 @@
             ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
             parsers = factory.getParsers();
             typeMap = factory.getTypeMap();
+            updateEventParsers();
         } else {
             parsers = previous.parsers;
             typeMap = previous.typeMap;
         }
-        updateEventParsers();
         constantLookups.forEach(c -> c.newPool());
         fillConstantPools(0);
         constantLookups.forEach(c -> c.getLatestPool().setResolving());
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri May 31 20:44:28 2019 +0200
@@ -47,7 +47,7 @@
             super(acc);
         }
 
-        public void process() throws Exception, IOException {
+        public void process() throws IOException {
             this.location = EventSetLocation.current();
             this.eventSet = location.acquire(startNanos, null); // use timestamp
                                                                 // from
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri May 31 20:44:28 2019 +0200
@@ -52,7 +52,7 @@
         private boolean reuse = true;
         private RecordedEvent[] sortedList;
         private boolean ordered;
-        private boolean finished;
+
 
         public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
             super(acc);
@@ -60,12 +60,11 @@
         }
 
         @Override
-        public void process() throws Exception {
+        public void process() throws IOException {
             chunkParser = new ChunkParser(input, reuse);
-            while (!isClosed() && !finished) {
+            while (!isClosed()) {
                 boolean reuse = this.reuse;
                 boolean ordered = this.ordered;
-
                 chunkParser.setReuse(reuse);
                 chunkParser.setOrdered(ordered);
                 chunkParser.resetEventCache();
@@ -76,9 +75,14 @@
                 } else {
                     processUnordered();
                 }
+                if (chunkParser.isLastChunk()) {
+                    return;
+                }
+                chunkParser = chunkParser.nextChunkParser();
             }
         }
 
+
         private void processOrdered() throws IOException {
             if (sortedList == null) {
                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -92,11 +96,7 @@
                     for (int i = 0; i < index; i++) {
                         dispatch(sortedList[i]);
                     }
-                    event = findNext();
-                    if (event == null) {
-                        finished = true;
-                        return;
-                    }
+                    return;
                 }
                 if (index == sortedList.length) {
                     RecordedEvent[] tmp = sortedList;
@@ -112,28 +112,12 @@
             while (!isClosed()) {
                 event = chunkParser.readEvent();
                 if (event == null) {
-                    event = findNext();
-                    if (event == null) {
-                        finished = true;
-                        return;
-                    }
+                    return;
                 }
                 dispatch(event);
             }
         }
 
-        private RecordedEvent findNext() throws IOException {
-            RecordedEvent event = null;
-            while (event == null) {
-                if (chunkParser.isLastChunk()) {
-                    return null;
-                }
-                chunkParser = chunkParser.nextChunkParser();
-                event = chunkParser.readEvent();
-            }
-            return event;
-        }
-
         public void setReuse(boolean reuse) {
             this.reuse = reuse;
         }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Fri May 31 20:44:28 2019 +0200
@@ -171,6 +171,10 @@
     }
 
     public void setOrdered(boolean ordered) {
+        if (this.ordered == ordered) {
+            return;
+        }
        this.ordered = ordered;
+       this.index = 0;
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java	Fri May 31 20:44:28 2019 +0200
@@ -138,14 +138,14 @@
         dirtyFilter = true;
     }
 
-    public RecordedEvent[] readEvents(int index) throws Exception {
+    public RecordedEvent[] readEvents(int index) throws IOException {
         while (!closed) {
 
             RecordedEvent[] events = (RecordedEvent[]) segments[index];
             if (events != null) {
                 return events;
             }
-            if (lock.tryLock(250, TimeUnit.MILLISECONDS)) {
+            if (await()) {
                 try {
                     addSegment(index);
                 } finally {
@@ -156,6 +156,14 @@
         return null;
     }
 
+    private boolean await()  {
+        try {
+            return lock.tryLock(250, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
     // held with lock
     private void addSegment(int index) throws IOException {
         if (chunkParser == null) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Fri May 31 20:44:28 2019 +0200
@@ -131,15 +131,24 @@
         jvm.exclude(Thread.currentThread());
         try {
             process();
+        } catch (IOException e) {
+            if (!isClosed()) {
+                logException(e);
+            }
         } catch (Exception e) {
-            e.printStackTrace(); // for debugging purposes, remove before integration
-            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
+            logException(e);
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
         }
     }
 
-    public abstract void process() throws Exception;
+    private void logException(Exception e) {
+        e.printStackTrace(); // for debugging purposes, remove before
+        // integration
+        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
+    }
+
+    public abstract void process() throws IOException;
 
     public synchronized boolean remove(Object action) {
         boolean remove = false;
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java	Thu May 30 23:12:44 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java	Fri May 31 20:44:28 2019 +0200
@@ -74,6 +74,7 @@
     }
 
     private static final int THREAD_COUNT = 4;
+    private static final boolean[] BOOLEAN_STATES = { false, true };
 
     public static void main(String... args) throws Exception {
         Path p = makeUnorderedRecording();
@@ -83,36 +84,42 @@
     }
 
     private static void testSetOrderedTrue(Path p) throws Exception {
-        AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
-        try (EventStream es = EventStream.openFile(p)) {
-            es.setOrdered(true);
-            es.onEvent(e -> {
-                Instant endTime = e.getEndTime();
-                if (endTime.isBefore(timestamp.get())) {
-                    throw new Error("Events are not ordered!");
-                }
-                timestamp.set(endTime);
-            });
-            es.start();
+        for (boolean reuse : BOOLEAN_STATES) {
+            AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+            try (EventStream es = EventStream.openFile(p)) {
+                es.setReuse(reuse);
+                es.setOrdered(true);
+                es.onEvent(e -> {
+                    Instant endTime = e.getEndTime();
+                    if (endTime.isBefore(timestamp.get())) {
+                        throw new Error("Events are not ordered! Reues = " + reuse);
+                    }
+                    timestamp.set(endTime);
+                });
+                es.start();
+            }
         }
     }
 
     private static void testSetOrderedFalse(Path p) throws Exception {
-        AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
-        AtomicBoolean unoreded = new AtomicBoolean(false);
-        try (EventStream es = EventStream.openFile(p)) {
-            es.setOrdered(false);
-            es.onEvent(e -> {
-                Instant endTime = e.getEndTime();
-                if (endTime.isBefore(timestamp.get())) {
-                    unoreded.set(true);
-                    es.close();
+        for (boolean reuse : BOOLEAN_STATES) {
+            AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+            AtomicBoolean unoreded = new AtomicBoolean(false);
+            try (EventStream es = EventStream.openFile(p)) {
+                es.setReuse(reuse);
+                es.setOrdered(false);
+                es.onEvent(e -> {
+                    Instant endTime = e.getEndTime();
+                    if (endTime.isBefore(timestamp.get())) {
+                        unoreded.set(true);
+                        es.close();
+                    }
+                    timestamp.set(endTime);
+                });
+                es.start();
+                if (!unoreded.get()) {
+                    throw new Exception("Expected at least some events to be out of order! Reues = " + reuse);
                 }
-                timestamp.set(endTime);
-            });
-            es.start();
-            if (!unoreded.get()) {
-                throw new Exception("Expected at least some events to be out of order");
             }
         }
     }
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java	Thu May 30 23:12:44 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java	Fri May 31 20:44:28 2019 +0200
@@ -31,7 +31,6 @@
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import jdk.jfr.Event;
 import jdk.jfr.Recording;
@@ -51,6 +50,8 @@
     static class ReuseEvent extends Event {
     }
 
+    private static final boolean[] BOOLEAN_STATES = { false, true };
+
     public static void main(String... args) throws Exception {
         Path p = makeRecording();
 
@@ -59,54 +60,66 @@
     }
 
     private static void testSetReuseFalse(Path p) throws Exception {
-        AtomicBoolean fail = new AtomicBoolean(false);
-        Map<RecordedEvent, RecordedEvent> identity = new IdentityHashMap<>();
-        try (EventStream es = EventStream.openFile(p)) {
-            es.setReuse(false);
-            es.onEvent(e -> {
-                if (identity.containsKey(e)) {
-                    fail.set(true);
-                    es.close();
-                }
-                identity.put(e, e);
-            });
-            es.start();
-        }
-        if (fail.get()) {
-            throw new Exception("Unexpected reuse!");
+        for (boolean ordered : BOOLEAN_STATES) {
+            AtomicBoolean fail = new AtomicBoolean(false);
+            Map<RecordedEvent, RecordedEvent> identity = new IdentityHashMap<>();
+            try (EventStream es = EventStream.openFile(p)) {
+                es.setOrdered(ordered);
+                es.setReuse(false);
+                es.onEvent(e -> {
+                    if (identity.containsKey(e)) {
+                        fail.set(true);
+                        es.close();
+                    }
+                    identity.put(e, e);
+                });
+                es.start();
+            }
+            if (fail.get()) {
+                throw new Exception("Unexpected reuse! Ordered = " + ordered);
+            }
+
         }
     }
 
     private static void testSetReuseTrue(Path p) throws Exception {
-        AtomicBoolean fail = new AtomicBoolean(false);
-        AtomicReference<RecordedEvent> event = new AtomicReference<RecordedEvent>(null);
-        try (EventStream es = EventStream.openFile(p)) {
-            es.setReuse(true);
-            es.onEvent(e -> {
-                if (event.get() == null) {
-                    event.set(e);
-                } else {
-                    if (e != event.get()) {
-                        fail.set(true);
+        for (boolean ordered : BOOLEAN_STATES) {
+            AtomicBoolean success = new AtomicBoolean(false);
+            Map<RecordedEvent, RecordedEvent> events = new IdentityHashMap<>();
+            try (EventStream es = EventStream.openFile(p)) {
+                es.setOrdered(ordered);
+                es.setReuse(true);
+                es.onEvent(e -> {
+                    if(events.containsKey(e)) {
+                        success.set(true);;
                         es.close();
                     }
-                }
-            });
-            es.start();
+                    events.put(e,e);
+                });
+                es.start();
+            }
+            if (!success.get()) {
+                throw new Exception("No reuse! Ordered = " + ordered);
+            }
         }
-        if (fail.get()) {
-            throw new Exception("No reuse!");
-        }
+
     }
 
     private static Path makeRecording() throws IOException {
         try (Recording r = new Recording()) {
             r.start();
-            for (int i = 0; i < 1_000; i++) {
+            for (int i = 0; i < 5; i++) {
+                ReuseEvent e = new ReuseEvent();
+                e.commit();
+            }
+            Recording rotation = new Recording();
+            rotation.start();
+            for (int i = 0; i < 5; i++) {
                 ReuseEvent e = new ReuseEvent();
                 e.commit();
             }
             r.stop();
+            rotation.close();
             Path p = Files.createTempFile("recording", ".jfr");
             r.dump(p);
             return p;