setOrdered and setReuse implemented for file stream, incl. unit tests
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu May 30 23:04:50 2019 +0200
@@ -62,16 +62,14 @@
private boolean chunkFinished;
private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
private boolean reuse;
+ private boolean ordered;
+ private boolean resetEventCache;
public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
this(new ChunkHeader(input), null, 500);
this.reuse = reuse;
}
- public void setReuse(boolean resue) {
- this.reuse = resue;
- updateParsers();
- }
private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
this.input = header.getInput();
@@ -95,7 +93,7 @@
parsers = previous.parsers;
typeMap = previous.typeMap;
}
- updateParsers();
+ updateEventParsers();
constantLookups.forEach(c -> c.newPool());
fillConstantPools(0);
constantLookups.forEach(c -> c.getLatestPool().setResolving());
@@ -116,23 +114,7 @@
return this.eventFilter;
}
- private void updateParsers() {
- parsers.forEach(p -> {
- if (p instanceof EventParser) {
- EventParser ep = (EventParser) p;
- if (reuse) {
- ep.setReuse(true);
- }
- long threshold = eventFilter.getThreshold(ep.getEventType().getName());
- if (threshold >= 0) {
- ep.setEnabled(true);
- ep.setThreshold(timeConverter.convertDurationNanos(threshold));
- } else {
- ep.setThreshold(-1L);
- }
- }
- });
- }
+
/**
* Reads an event and returns null when segment or chunk ends.
@@ -163,7 +145,7 @@
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
- updateParsers();
+ updateEventParsers();
}
if (contantPosition != chunkHeader.getConstantPoolPosition()) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
@@ -356,4 +338,43 @@
public boolean isChunkFinished() {
return chunkFinished;
}
+
+ // Need to call updateEventParsers() for
+ // change to take effect
+ public void setReuse(boolean resue) {
+ this.reuse = resue;
+ }
+
+ // Need to call updateEventParsers() for
+ // change to take effect
+ public void setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ }
+
+ // Need to call updateEventParsers() for
+ // change to take effect
+ public void resetEventCache() {
+ this.resetEventCache = true;
+ }
+
+ public void updateEventParsers() {
+ parsers.forEach(p -> {
+ if (p instanceof EventParser) {
+ EventParser ep = (EventParser) p;
+ ep.setOrdered(ordered);
+ ep.setReuse(reuse);
+ if (resetEventCache) {
+ ep.resetCache();
+ }
+ long threshold = eventFilter.getThreshold(ep.getEventType().getName());
+ if (threshold >= 0) {
+ ep.setEnabled(true);
+ ep.setThreshold(timeConverter.convertDurationNanos(threshold));
+ } else {
+ ep.setThreshold(-1L);
+ }
+ }
+ });
+ resetEventCache = false;
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu May 30 23:04:50 2019 +0200
@@ -181,4 +181,11 @@
public void setReuse(boolean reuse) {
eventConsumer.setReuse(reuse);
}
+
+ @Override
+ public void setOrdered(boolean ordered) {
+ if (ordered == false) {
+ throw new UnsupportedOperationException("Unordered not implemented yet");
+ }
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu May 30 23:04:50 2019 +0200
@@ -30,6 +30,8 @@
import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.Objects;
import java.util.function.Consumer;
@@ -43,9 +45,14 @@
final class EventFileStream implements EventStream {
private final static class FileEventConsumer extends EventConsumer {
+ private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
+ private static final int DEFAULT_ARRAY_SIZE = 100_000;
private final RecordingInput input;
private ChunkParser chunkParser;
private boolean reuse = true;
+ private RecordedEvent[] sortedList;
+ private boolean ordered;
+ private boolean finished;
public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
super(acc);
@@ -55,13 +62,59 @@
@Override
public void process() throws Exception {
chunkParser = new ChunkParser(input, reuse);
- chunkParser.setReuse(reuse);
+ while (!isClosed() && !finished) {
+ boolean reuse = this.reuse;
+ boolean ordered = this.ordered;
+
+ chunkParser.setReuse(reuse);
+ chunkParser.setOrdered(ordered);
+ chunkParser.resetEventCache();
+ chunkParser.updateEventParsers();
+
+ if (ordered) {
+ processOrdered();
+ } else {
+ processUnordered();
+ }
+ }
+ }
+
+ private void processOrdered() throws IOException {
+ if (sortedList == null) {
+ sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
+ }
RecordedEvent event;
+ int index = 0;
while (true) {
event = chunkParser.readEvent();
if (event == null) {
+ Arrays.sort(sortedList, 0, index, END_TIME);
+ for (int i = 0; i < index; i++) {
+ dispatch(sortedList[i]);
+ }
+ event = findNext();
+ if (event == null) {
+ finished = true;
+ return;
+ }
+ }
+ if (index == sortedList.length) {
+ RecordedEvent[] tmp = sortedList;
+ sortedList = new RecordedEvent[2 * tmp.length];
+ System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
+ }
+ sortedList[index++] = event;
+ }
+ }
+
+ private void processUnordered() throws IOException {
+ RecordedEvent event;
+ while (!isClosed()) {
+ event = chunkParser.readEvent();
+ if (event == null) {
event = findNext();
if (event == null) {
+ finished = true;
return;
}
}
@@ -82,11 +135,11 @@
}
public void setReuse(boolean reuse) {
- if (chunkParser == null) {
- this.reuse = reuse;
- } else {
- chunkParser.setReuse(reuse);
- }
+ this.reuse = reuse;
+ }
+
+ public void setOrdered(boolean ordered) {
+ this.ordered = ordered;
}
}
@@ -165,4 +218,9 @@
public void awaitTermination() {
eventConsumer.awaitTermination();
}
+
+ @Override
+ public void setOrdered(boolean ordered) {
+ eventConsumer.setOrdered(ordered);
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Thu May 30 23:04:50 2019 +0200
@@ -46,10 +46,11 @@
private final boolean hasDuration;
private final List<ValueDescriptor> valueDescriptors;
private final int startIndex;
- private final RecordedEvent event;
private long thresholdTicks = -1;
private boolean enabled = true;
- private boolean reuse;
+ private RecordedEvent[] eventCache;
+ private int index;
+ private boolean ordered;
EventParser(TimeConverter timeConverter, EventType type, Parser[] parsers) {
this.timeConverter = timeConverter;
@@ -58,7 +59,23 @@
this.hasDuration = type.getField(FIELD_DURATION) != null;
this.startIndex = hasDuration ? 2 : 1;
this.valueDescriptors = type.getFields();
- this.event = new RecordedEvent(type, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter);
+ }
+
+ private RecordedEvent cachedEvent() {
+ if (index == eventCache.length) {
+ RecordedEvent[] cache = eventCache;
+ eventCache = new RecordedEvent[eventCache.length * 2];
+ System.arraycopy(cache, 0, eventCache, 0, cache.length);
+ }
+ RecordedEvent event = eventCache[index];
+ if (event == null) {
+ event = new RecordedEvent(eventType, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter);
+ eventCache[index] = event;
+ }
+ if (ordered) {
+ index++;
+ }
+ return event;
}
public EventType getEventType() {
@@ -87,7 +104,8 @@
return null;
}
}
- if (reuse) {
+ if (eventCache != null) {
+ RecordedEvent event = cachedEvent();
Object[] values = event.objects;
for (int i = startIndex; i < parsers.length; i++) {
values[i] = parsers[i].parse(input);
@@ -124,7 +142,7 @@
}
}
}
- return event;
+ return null;
}
@Override
@@ -132,8 +150,27 @@
throw new InternalError("Should not call this method. More efficent to read event size and skip ahead");
}
- public void setReuse(boolean reuse) {
- this.reuse = reuse;
+ public void resetCache() {
+ index = 0;
+ }
+
+ public boolean hasReuse() {
+ return eventCache != null;
}
+ public void setReuse(boolean reuse) {
+ if (reuse == hasReuse()) {
+ return;
+ }
+ if (reuse) {
+ eventCache = new RecordedEvent[2];
+ index = 0;
+ } else {
+ eventCache = null;
+ }
+ }
+
+ public void setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu May 30 23:04:50 2019 +0200
@@ -139,19 +139,32 @@
boolean remove(Object action);
/**
- * Hint that the the event object in an {@link #onEvent(Consumer)} action
+ * Hint that the event object in an {@link #onEvent(Consumer)} action
* may be reused.
* <p>
- * If reuse is set to
- * {@code true), a callback should not keep a reference to the event object
- * after the callback from {@code onEvent} has returned.
+ * If reuse is set to {@code true), a callback should not keep a reference
+ * to the event object after the callback from {@code onEvent} has returned.
* <p>
- * By default reuse is {@code true}
+ * By default reuse is set to {@code true}
+ *
+ * @param resuse if event objects can be reused between calls to {@code #onEvent(Consumer)}
*
*/
public void setReuse(boolean reuse);
/**
+ * Orders events in chronological order aft the end timestamp
+ *
+ * TODO: WHAT ABOUT EVENTS THAT OCCUR WAY OUT OF ORDER
+ * <p>
+ * By default ordered is set to {@code true}
+ *
+ * @param ordered if event objects arrive in chronological order to {@code #onEvent(Consumer)}
+ */
+ public void setOrdered(boolean ordered);
+
+
+ /**
* Starts processing events in the stream.
* <p>
* All actions will performed on this stream will happen in the current
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu May 30 23:04:50 2019 +0200
@@ -299,6 +299,11 @@
@Override
public void setReuse(boolean reuse) {
- // hint is ignored
+ throw new UnsupportedOperationException("Unordered not implemented yet");
+ }
+
+ @Override
+ public void setOrdered(boolean ordered) {
+ stream.setOrdered(ordered);
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Mon May 27 23:05:54 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Thu May 30 23:04:50 2019 +0200
@@ -131,9 +131,9 @@
jvm.exclude(Thread.currentThread());
try {
process();
- } catch (Throwable e) {
- e.printStackTrace();
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
+ } catch (Exception e) {
+ e.printStackTrace(); // for debugging purposes, remove before integration
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
}
@@ -185,7 +185,12 @@
dispatcher.put(e.getEventType().getId(), consumerDispatch);
}
for (int i = 0; i < consumerDispatch.length; i++) {
- consumerDispatch[i].offer(e);
+ try {
+ consumerDispatch[i].offer(e);
+ } catch (Exception exception) {
+ // Is this a reasonable behavior for an exception?
+ // Error will abort the stream.
+ }
}
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Thu May 30 23:04:50 2019 +0200
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.api.consumer.filestream;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jdk.jfr.Event;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
+
+/**
+ * @test
+ * @summary Test EventStream::setOrdered(...)
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.filestream.TestOrdered
+ */
+public class TestOrdered {
+
+ static class OrderedEvent extends Event {
+ }
+
+ static class Emitter extends Thread {
+ private final CyclicBarrier barrier;
+
+ public Emitter(CyclicBarrier barrier) {
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void run() {
+ OrderedEvent e1 = new OrderedEvent();
+ e1.commit();
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Unexpected exception in barrier");
+ }
+ OrderedEvent e2 = new OrderedEvent();
+ e2.commit();
+ }
+ }
+
+ private static final int THREAD_COUNT = 4;
+
+ public static void main(String... args) throws Exception {
+ Path p = makeUnorderedRecording();
+
+ testSetOrderedTrue(p);
+ testSetOrderedFalse(p);
+ }
+
+ private static void testSetOrderedTrue(Path p) throws Exception {
+ AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setOrdered(true);
+ es.onEvent(e -> {
+ Instant endTime = e.getEndTime();
+ if (endTime.isBefore(timestamp.get())) {
+ throw new Error("Events are not ordered!");
+ }
+ timestamp.set(endTime);
+ });
+ es.start();
+ }
+ }
+
+ private static void testSetOrderedFalse(Path p) throws Exception {
+ AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+ AtomicBoolean unoreded = new AtomicBoolean(false);
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setOrdered(false);
+ es.onEvent(e -> {
+ Instant endTime = e.getEndTime();
+ if (endTime.isBefore(timestamp.get())) {
+ unoreded.set(true);
+ es.close();
+ }
+ timestamp.set(endTime);
+ });
+ es.start();
+ if (!unoreded.get()) {
+ throw new Exception("Expected at least some events to be out of order");
+ }
+ }
+ }
+
+ private static Path makeUnorderedRecording() throws Exception {
+ CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT + 1);
+
+ try (Recording r = new Recording()) {
+ r.start();
+ List<Emitter> emitters = new ArrayList<>();
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ Emitter e = new Emitter(barrier);
+ e.start();
+ emitters.add(e);
+ }
+ // Thread buffers should now have one event each
+ barrier.await();
+ // Add another event to each thread buffer, so
+ // events are bound to come out of order when they
+ // are flushed
+ for (Emitter e : emitters) {
+ e.join();
+ }
+ r.stop();
+ Path p = Files.createTempFile("recording", ".jfr");
+ r.dump(p);
+ return p;
+ }
+ }
+}
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Mon May 27 23:05:54 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Thu May 30 23:04:50 2019 +0200
@@ -31,6 +31,7 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import jdk.jfr.Event;
import jdk.jfr.Recording;
@@ -67,34 +68,34 @@
fail.set(true);
throw new Error("Unexpected reuse!");
}
- identity.put(e,e);
+ identity.put(e, e);
});
es.start();
}
if (fail.get()) {
- throw new Exception("Unexpected resued");
+ throw new Exception("Unexpected reuse!");
}
}
private static void testSetReuseTrue(Path p) throws Exception {
AtomicBoolean fail = new AtomicBoolean(false);
+ AtomicReference<RecordedEvent> event = new AtomicReference<RecordedEvent>(null);
try (EventStream es = EventStream.openFile(p)) {
es.setReuse(true);
- RecordedEvent[] events = new RecordedEvent[1];
es.onEvent(e -> {
- if (events[0] == null) {
- events[0] = e;
+ if (event.get() == null) {
+ event.set(e);
} else {
- if (e != events[0]) {
+ if (e != event.get()) {
fail.set(true);
- throw new Error("No reuse");
+ throw new Error("No reuse!");
}
}
});
es.start();
}
if (fail.get()) {
- throw new Exception("No reuse");
+ throw new Exception("No reuse!");
}
}