setOrdered and setReuse implemented for file stream, incl. unit tests JEP-349-branch
authoregahlin
Thu, 30 May 2019 23:04:50 +0200
branchJEP-349-branch
changeset 57380 6a7e7743b82f
parent 57378 2e35a025ebee
child 57381 ce265e404c64
setOrdered and setReuse implemented for file stream, incl. unit tests
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java
test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Thu May 30 23:04:50 2019 +0200
@@ -62,16 +62,14 @@
     private boolean chunkFinished;
     private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
     private boolean reuse;
+    private boolean ordered;
+    private boolean resetEventCache;
 
     public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
         this(new ChunkHeader(input), null, 500);
        this.reuse = reuse;
     }
 
-    public void setReuse(boolean resue) {
-        this.reuse = resue;
-        updateParsers();
-    }
 
     private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
         this.input = header.getInput();
@@ -95,7 +93,7 @@
             parsers = previous.parsers;
             typeMap = previous.typeMap;
         }
-        updateParsers();
+        updateEventParsers();
         constantLookups.forEach(c -> c.newPool());
         fillConstantPools(0);
         constantLookups.forEach(c -> c.getLatestPool().setResolving());
@@ -116,23 +114,7 @@
         return this.eventFilter;
     }
 
-    private void updateParsers() {
-        parsers.forEach(p -> {
-            if (p instanceof EventParser) {
-                EventParser ep = (EventParser) p;
-                if (reuse) {
-                    ep.setReuse(true);
-                }
-                long threshold = eventFilter.getThreshold(ep.getEventType().getName());
-                if (threshold >= 0) {
-                    ep.setEnabled(true);
-                    ep.setThreshold(timeConverter.convertDurationNanos(threshold));
-                } else {
-                    ep.setThreshold(-1L);
-                }
-            }
-        });
-    }
+
 
     /**
      * Reads an event and returns null when segment or chunk ends.
@@ -163,7 +145,7 @@
                 ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
                 parsers = factory.getParsers();
                 typeMap = factory.getTypeMap();
-                updateParsers();
+                updateEventParsers();
             }
             if (contantPosition != chunkHeader.getConstantPoolPosition()) {
                 Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
@@ -356,4 +338,43 @@
     public boolean isChunkFinished() {
         return chunkFinished;
     }
+
+    // Need to call updateEventParsers() for
+    // change to take effect
+    public void setReuse(boolean resue) {
+        this.reuse = resue;
+    }
+
+    // Need to call updateEventParsers() for
+    // change to take effect
+    public void setOrdered(boolean ordered) {
+        this.ordered = ordered;
+    }
+
+    // Need to call updateEventParsers() for
+    // change to take effect
+    public void resetEventCache() {
+        this.resetEventCache = true;
+    }
+
+    public void updateEventParsers() {
+        parsers.forEach(p -> {
+            if (p instanceof EventParser) {
+                EventParser ep = (EventParser) p;
+                ep.setOrdered(ordered);
+                ep.setReuse(reuse);
+                if (resetEventCache) {
+                    ep.resetCache();
+                }
+                long threshold = eventFilter.getThreshold(ep.getEventType().getName());
+                if (threshold >= 0) {
+                    ep.setEnabled(true);
+                    ep.setThreshold(timeConverter.convertDurationNanos(threshold));
+                } else {
+                    ep.setThreshold(-1L);
+                }
+            }
+        });
+        resetEventCache = false;
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu May 30 23:04:50 2019 +0200
@@ -181,4 +181,11 @@
     public void setReuse(boolean reuse) {
         eventConsumer.setReuse(reuse);
     }
+
+    @Override
+    public void setOrdered(boolean ordered) {
+      if (ordered == false) {
+          throw new UnsupportedOperationException("Unordered not implemented yet");
+      }
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu May 30 23:04:50 2019 +0200
@@ -30,6 +30,8 @@
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Objects;
 import java.util.function.Consumer;
 
@@ -43,9 +45,14 @@
 final class EventFileStream implements EventStream {
 
     private final static class FileEventConsumer extends EventConsumer {
+        private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
+        private static final int DEFAULT_ARRAY_SIZE = 100_000;
         private final RecordingInput input;
         private ChunkParser chunkParser;
         private boolean reuse = true;
+        private RecordedEvent[] sortedList;
+        private boolean ordered;
+        private boolean finished;
 
         public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
             super(acc);
@@ -55,13 +62,59 @@
         @Override
         public void process() throws Exception {
             chunkParser = new ChunkParser(input, reuse);
-            chunkParser.setReuse(reuse);
+            while (!isClosed() && !finished) {
+                boolean reuse = this.reuse;
+                boolean ordered = this.ordered;
+
+                chunkParser.setReuse(reuse);
+                chunkParser.setOrdered(ordered);
+                chunkParser.resetEventCache();
+                chunkParser.updateEventParsers();
+
+                if (ordered) {
+                    processOrdered();
+                } else {
+                    processUnordered();
+                }
+            }
+        }
+
+        private void processOrdered() throws IOException {
+            if (sortedList == null) {
+                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
+            }
             RecordedEvent event;
+            int index = 0;
             while (true) {
                 event = chunkParser.readEvent();
                 if (event == null) {
+                    Arrays.sort(sortedList, 0, index, END_TIME);
+                    for (int i = 0; i < index; i++) {
+                        dispatch(sortedList[i]);
+                    }
+                    event = findNext();
+                    if (event == null) {
+                        finished = true;
+                        return;
+                    }
+                }
+                if (index == sortedList.length) {
+                    RecordedEvent[] tmp = sortedList;
+                    sortedList = new RecordedEvent[2 * tmp.length];
+                    System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
+                }
+                sortedList[index++] = event;
+            }
+        }
+
+        private void processUnordered() throws IOException {
+            RecordedEvent event;
+            while (!isClosed()) {
+                event = chunkParser.readEvent();
+                if (event == null) {
                     event = findNext();
                     if (event == null) {
+                        finished = true;
                         return;
                     }
                 }
@@ -82,11 +135,11 @@
         }
 
         public void setReuse(boolean reuse) {
-            if (chunkParser == null) {
-                this.reuse = reuse;
-            } else {
-                chunkParser.setReuse(reuse);
-            }
+            this.reuse = reuse;
+        }
+
+        public void setOrdered(boolean ordered) {
+            this.ordered = ordered;
         }
     }
 
@@ -165,4 +218,9 @@
     public void awaitTermination() {
         eventConsumer.awaitTermination();
     }
+
+    @Override
+    public void setOrdered(boolean ordered) {
+        eventConsumer.setOrdered(ordered);
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Thu May 30 23:04:50 2019 +0200
@@ -46,10 +46,11 @@
     private final boolean hasDuration;
     private final List<ValueDescriptor> valueDescriptors;
     private final int startIndex;
-    private final RecordedEvent event;
     private long thresholdTicks = -1;
     private boolean enabled = true;
-    private boolean reuse;
+    private RecordedEvent[] eventCache;
+    private int index;
+    private boolean ordered;
 
     EventParser(TimeConverter timeConverter, EventType type, Parser[] parsers) {
         this.timeConverter = timeConverter;
@@ -58,7 +59,23 @@
         this.hasDuration = type.getField(FIELD_DURATION) != null;
         this.startIndex = hasDuration ? 2 : 1;
         this.valueDescriptors = type.getFields();
-        this.event = new RecordedEvent(type, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter);
+    }
+
+    private RecordedEvent cachedEvent() {
+        if (index == eventCache.length) {
+            RecordedEvent[] cache = eventCache;
+            eventCache = new RecordedEvent[eventCache.length * 2];
+            System.arraycopy(cache, 0, eventCache, 0, cache.length);
+        }
+        RecordedEvent event = eventCache[index];
+        if (event == null) {
+            event = new RecordedEvent(eventType, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter);
+            eventCache[index] = event;
+        }
+        if (ordered) {
+            index++;
+        }
+        return event;
     }
 
     public EventType getEventType() {
@@ -87,7 +104,8 @@
                     return null;
                 }
             }
-            if (reuse) {
+            if (eventCache != null) {
+                RecordedEvent event = cachedEvent();
                 Object[] values = event.objects;
                 for (int i = startIndex; i < parsers.length; i++) {
                     values[i] = parsers[i].parse(input);
@@ -124,7 +142,7 @@
                 }
             }
         }
-        return event;
+        return null;
     }
 
     @Override
@@ -132,8 +150,27 @@
         throw new InternalError("Should not call this method. More efficent to read event size and skip ahead");
     }
 
-    public void setReuse(boolean reuse) {
-        this.reuse = reuse;
+    public void resetCache() {
+        index = 0;
+    }
+
+    public boolean hasReuse() {
+        return eventCache != null;
     }
 
+    public void setReuse(boolean reuse) {
+        if (reuse == hasReuse()) {
+            return;
+        }
+        if (reuse) {
+            eventCache = new RecordedEvent[2];
+            index = 0;
+        } else {
+            eventCache = null;
+        }
+    }
+
+    public void setOrdered(boolean ordered) {
+       this.ordered = ordered;
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Thu May 30 23:04:50 2019 +0200
@@ -139,19 +139,32 @@
     boolean remove(Object action);
 
     /**
-     * Hint that the the event object in an {@link #onEvent(Consumer)} action
+     * Hint that the event object in an {@link #onEvent(Consumer)} action
      * may be reused.
      * <p>
-     * If reuse is set to
-     * {@code true), a callback should not keep a reference to the event object
-     * after the callback from {@code onEvent} has returned.
+     * If reuse is set to {@code true), a callback should not keep a reference
+     * to the event object after the callback from {@code onEvent} has returned.
      * <p>
-     * By default reuse is {@code true}
+     * By default reuse is set to {@code true}
+     *
+     * @param resuse if event objects can be reused between calls to {@code #onEvent(Consumer)}
      *
      */
     public void setReuse(boolean reuse);
 
     /**
+     * Orders events in chronological order aft the end timestamp
+     *
+     * TODO: WHAT ABOUT EVENTS THAT OCCUR WAY OUT OF ORDER
+     * <p>
+     * By default ordered is set to {@code true}
+     *
+     * @param ordered if event objects arrive in chronological order to {@code #onEvent(Consumer)}
+     */
+    public void setOrdered(boolean ordered);
+
+
+    /**
      * Starts processing events in the stream.
      * <p>
      * All actions will performed on this stream will happen in the current
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Thu May 30 23:04:50 2019 +0200
@@ -299,6 +299,11 @@
 
     @Override
     public void setReuse(boolean reuse) {
-     // hint is ignored
+        throw new UnsupportedOperationException("Unordered not implemented yet");
+    }
+
+    @Override
+    public void setOrdered(boolean ordered) {
+        stream.setOrdered(ordered);
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Thu May 30 23:04:50 2019 +0200
@@ -131,9 +131,9 @@
         jvm.exclude(Thread.currentThread());
         try {
             process();
-        } catch (Throwable e) {
-            e.printStackTrace();
-            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
+        } catch (Exception e) {
+            e.printStackTrace(); // for debugging purposes, remove before integration
+            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
         }
@@ -185,7 +185,12 @@
             dispatcher.put(e.getEventType().getId(), consumerDispatch);
         }
         for (int i = 0; i < consumerDispatch.length; i++) {
-            consumerDispatch[i].offer(e);
+            try {
+                consumerDispatch[i].offer(e);
+            } catch (Exception exception) {
+                // Is this a reasonable behavior for an exception?
+                // Error will abort the stream.
+            }
         }
 
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java	Thu May 30 23:04:50 2019 +0200
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.api.consumer.filestream;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jdk.jfr.Event;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
+
+/**
+ * @test
+ * @summary Test EventStream::setOrdered(...)
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.filestream.TestOrdered
+ */
+public class TestOrdered {
+
+    static class OrderedEvent extends Event {
+    }
+
+    static class Emitter extends Thread {
+        private final CyclicBarrier barrier;
+
+        public Emitter(CyclicBarrier barrier) {
+            this.barrier = barrier;
+        }
+
+        @Override
+        public void run() {
+            OrderedEvent e1 = new OrderedEvent();
+            e1.commit();
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new Error("Unexpected exception in barrier");
+            }
+            OrderedEvent e2 = new OrderedEvent();
+            e2.commit();
+        }
+    }
+
+    private static final int THREAD_COUNT = 4;
+
+    public static void main(String... args) throws Exception {
+        Path p = makeUnorderedRecording();
+
+        testSetOrderedTrue(p);
+        testSetOrderedFalse(p);
+    }
+
+    private static void testSetOrderedTrue(Path p) throws Exception {
+        AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+        try (EventStream es = EventStream.openFile(p)) {
+            es.setOrdered(true);
+            es.onEvent(e -> {
+                Instant endTime = e.getEndTime();
+                if (endTime.isBefore(timestamp.get())) {
+                    throw new Error("Events are not ordered!");
+                }
+                timestamp.set(endTime);
+            });
+            es.start();
+        }
+    }
+
+    private static void testSetOrderedFalse(Path p) throws Exception {
+        AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+        AtomicBoolean unoreded = new AtomicBoolean(false);
+        try (EventStream es = EventStream.openFile(p)) {
+            es.setOrdered(false);
+            es.onEvent(e -> {
+                Instant endTime = e.getEndTime();
+                if (endTime.isBefore(timestamp.get())) {
+                    unoreded.set(true);
+                    es.close();
+                }
+                timestamp.set(endTime);
+            });
+            es.start();
+            if (!unoreded.get()) {
+                throw new Exception("Expected at least some events to be out of order");
+            }
+        }
+    }
+
+    private static Path makeUnorderedRecording() throws Exception {
+        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT + 1);
+
+        try (Recording r = new Recording()) {
+            r.start();
+            List<Emitter> emitters = new ArrayList<>();
+            for (int i = 0; i < THREAD_COUNT; i++) {
+                Emitter e = new Emitter(barrier);
+                e.start();
+                emitters.add(e);
+            }
+            // Thread buffers should now have one event each
+            barrier.await();
+            // Add another event to each thread buffer, so
+            // events are bound to come out of order when they
+            // are flushed
+            for (Emitter e : emitters) {
+                e.join();
+            }
+            r.stop();
+            Path p = Files.createTempFile("recording", ".jfr");
+            r.dump(p);
+            return p;
+        }
+    }
+}
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java	Mon May 27 23:05:54 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java	Thu May 30 23:04:50 2019 +0200
@@ -31,6 +31,7 @@
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import jdk.jfr.Event;
 import jdk.jfr.Recording;
@@ -67,34 +68,34 @@
                     fail.set(true);
                     throw new Error("Unexpected reuse!");
                 }
-                identity.put(e,e);
+                identity.put(e, e);
             });
             es.start();
         }
         if (fail.get()) {
-            throw new Exception("Unexpected resued");
+            throw new Exception("Unexpected reuse!");
         }
     }
 
     private static void testSetReuseTrue(Path p) throws Exception {
         AtomicBoolean fail = new AtomicBoolean(false);
+        AtomicReference<RecordedEvent> event = new AtomicReference<RecordedEvent>(null);
         try (EventStream es = EventStream.openFile(p)) {
             es.setReuse(true);
-            RecordedEvent[] events = new RecordedEvent[1];
             es.onEvent(e -> {
-                if (events[0] == null) {
-                    events[0] = e;
+                if (event.get() == null) {
+                    event.set(e);
                 } else {
-                    if (e != events[0]) {
+                    if (e != event.get()) {
                         fail.set(true);
-                        throw new Error("No reuse");
+                        throw new Error("No reuse!");
                     }
                 }
             });
             es.start();
         }
         if (fail.get()) {
-            throw new Exception("No reuse");
+            throw new Exception("No reuse!");
         }
     }