Make setReuse and setOrdered work across chunk boundaries. Improved unit tests
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 31 20:44:28 2019 +0200
@@ -82,6 +82,9 @@
this.constantLookups = previous.constantLookups;
this.previousMetadata = previous.metadata;
this.pollInterval = previous.pollInterval;
+ this.ordered = previous.ordered;
+ this.reuse = previous.reuse;
+ this.eventFilter = previous.eventFilter;
}
this.metadata = header.readMetadata(previousMetadata);
this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
@@ -89,11 +92,11 @@
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
+ updateEventParsers();
} else {
parsers = previous.parsers;
typeMap = previous.typeMap;
}
- updateEventParsers();
constantLookups.forEach(c -> c.newPool());
fillConstantPools(0);
constantLookups.forEach(c -> c.getLatestPool().setResolving());
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 31 20:44:28 2019 +0200
@@ -47,7 +47,7 @@
super(acc);
}
- public void process() throws Exception, IOException {
+ public void process() throws IOException {
this.location = EventSetLocation.current();
this.eventSet = location.acquire(startNanos, null); // use timestamp
// from
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 31 20:44:28 2019 +0200
@@ -52,7 +52,7 @@
private boolean reuse = true;
private RecordedEvent[] sortedList;
private boolean ordered;
- private boolean finished;
+
public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
super(acc);
@@ -60,12 +60,11 @@
}
@Override
- public void process() throws Exception {
+ public void process() throws IOException {
chunkParser = new ChunkParser(input, reuse);
- while (!isClosed() && !finished) {
+ while (!isClosed()) {
boolean reuse = this.reuse;
boolean ordered = this.ordered;
-
chunkParser.setReuse(reuse);
chunkParser.setOrdered(ordered);
chunkParser.resetEventCache();
@@ -76,9 +75,14 @@
} else {
processUnordered();
}
+ if (chunkParser.isLastChunk()) {
+ return;
+ }
+ chunkParser = chunkParser.nextChunkParser();
}
}
+
private void processOrdered() throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -92,11 +96,7 @@
for (int i = 0; i < index; i++) {
dispatch(sortedList[i]);
}
- event = findNext();
- if (event == null) {
- finished = true;
- return;
- }
+ return;
}
if (index == sortedList.length) {
RecordedEvent[] tmp = sortedList;
@@ -112,28 +112,12 @@
while (!isClosed()) {
event = chunkParser.readEvent();
if (event == null) {
- event = findNext();
- if (event == null) {
- finished = true;
- return;
- }
+ return;
}
dispatch(event);
}
}
- private RecordedEvent findNext() throws IOException {
- RecordedEvent event = null;
- while (event == null) {
- if (chunkParser.isLastChunk()) {
- return null;
- }
- chunkParser = chunkParser.nextChunkParser();
- event = chunkParser.readEvent();
- }
- return event;
- }
-
public void setReuse(boolean reuse) {
this.reuse = reuse;
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Fri May 31 20:44:28 2019 +0200
@@ -171,6 +171,10 @@
}
public void setOrdered(boolean ordered) {
+ if (this.ordered == ordered) {
+ return;
+ }
this.ordered = ordered;
+ this.index = 0;
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Fri May 31 20:44:28 2019 +0200
@@ -138,14 +138,14 @@
dirtyFilter = true;
}
- public RecordedEvent[] readEvents(int index) throws Exception {
+ public RecordedEvent[] readEvents(int index) throws IOException {
while (!closed) {
RecordedEvent[] events = (RecordedEvent[]) segments[index];
if (events != null) {
return events;
}
- if (lock.tryLock(250, TimeUnit.MILLISECONDS)) {
+ if (await()) {
try {
addSegment(index);
} finally {
@@ -156,6 +156,14 @@
return null;
}
+ private boolean await() {
+ try {
+ return lock.tryLock(250, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
// held with lock
private void addSegment(int index) throws IOException {
if (chunkParser == null) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Thu May 30 23:12:44 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri May 31 20:44:28 2019 +0200
@@ -131,15 +131,24 @@
jvm.exclude(Thread.currentThread());
try {
process();
+ } catch (IOException e) {
+ if (!isClosed()) {
+ logException(e);
+ }
} catch (Exception e) {
- e.printStackTrace(); // for debugging purposes, remove before integration
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
+ logException(e);
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
}
}
- public abstract void process() throws Exception;
+ private void logException(Exception e) {
+ e.printStackTrace(); // for debugging purposes, remove before
+ // integration
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
+ }
+
+ public abstract void process() throws IOException;
public synchronized boolean remove(Object action) {
boolean remove = false;
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Thu May 30 23:12:44 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Fri May 31 20:44:28 2019 +0200
@@ -74,6 +74,7 @@
}
private static final int THREAD_COUNT = 4;
+ private static final boolean[] BOOLEAN_STATES = { false, true };
public static void main(String... args) throws Exception {
Path p = makeUnorderedRecording();
@@ -83,36 +84,42 @@
}
private static void testSetOrderedTrue(Path p) throws Exception {
- AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
- try (EventStream es = EventStream.openFile(p)) {
- es.setOrdered(true);
- es.onEvent(e -> {
- Instant endTime = e.getEndTime();
- if (endTime.isBefore(timestamp.get())) {
- throw new Error("Events are not ordered!");
- }
- timestamp.set(endTime);
- });
- es.start();
+ for (boolean reuse : BOOLEAN_STATES) {
+ AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setReuse(reuse);
+ es.setOrdered(true);
+ es.onEvent(e -> {
+ Instant endTime = e.getEndTime();
+ if (endTime.isBefore(timestamp.get())) {
+ throw new Error("Events are not ordered! Reues = " + reuse);
+ }
+ timestamp.set(endTime);
+ });
+ es.start();
+ }
}
}
private static void testSetOrderedFalse(Path p) throws Exception {
- AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
- AtomicBoolean unoreded = new AtomicBoolean(false);
- try (EventStream es = EventStream.openFile(p)) {
- es.setOrdered(false);
- es.onEvent(e -> {
- Instant endTime = e.getEndTime();
- if (endTime.isBefore(timestamp.get())) {
- unoreded.set(true);
- es.close();
+ for (boolean reuse : BOOLEAN_STATES) {
+ AtomicReference<Instant> timestamp = new AtomicReference<>(Instant.MIN);
+ AtomicBoolean unoreded = new AtomicBoolean(false);
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setReuse(reuse);
+ es.setOrdered(false);
+ es.onEvent(e -> {
+ Instant endTime = e.getEndTime();
+ if (endTime.isBefore(timestamp.get())) {
+ unoreded.set(true);
+ es.close();
+ }
+ timestamp.set(endTime);
+ });
+ es.start();
+ if (!unoreded.get()) {
+ throw new Exception("Expected at least some events to be out of order! Reues = " + reuse);
}
- timestamp.set(endTime);
- });
- es.start();
- if (!unoreded.get()) {
- throw new Exception("Expected at least some events to be out of order");
}
}
}
--- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Thu May 30 23:12:44 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Fri May 31 20:44:28 2019 +0200
@@ -31,7 +31,6 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import jdk.jfr.Event;
import jdk.jfr.Recording;
@@ -51,6 +50,8 @@
static class ReuseEvent extends Event {
}
+ private static final boolean[] BOOLEAN_STATES = { false, true };
+
public static void main(String... args) throws Exception {
Path p = makeRecording();
@@ -59,54 +60,66 @@
}
private static void testSetReuseFalse(Path p) throws Exception {
- AtomicBoolean fail = new AtomicBoolean(false);
- Map<RecordedEvent, RecordedEvent> identity = new IdentityHashMap<>();
- try (EventStream es = EventStream.openFile(p)) {
- es.setReuse(false);
- es.onEvent(e -> {
- if (identity.containsKey(e)) {
- fail.set(true);
- es.close();
- }
- identity.put(e, e);
- });
- es.start();
- }
- if (fail.get()) {
- throw new Exception("Unexpected reuse!");
+ for (boolean ordered : BOOLEAN_STATES) {
+ AtomicBoolean fail = new AtomicBoolean(false);
+ Map<RecordedEvent, RecordedEvent> identity = new IdentityHashMap<>();
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setOrdered(ordered);
+ es.setReuse(false);
+ es.onEvent(e -> {
+ if (identity.containsKey(e)) {
+ fail.set(true);
+ es.close();
+ }
+ identity.put(e, e);
+ });
+ es.start();
+ }
+ if (fail.get()) {
+ throw new Exception("Unexpected reuse! Ordered = " + ordered);
+ }
+
}
}
private static void testSetReuseTrue(Path p) throws Exception {
- AtomicBoolean fail = new AtomicBoolean(false);
- AtomicReference<RecordedEvent> event = new AtomicReference<RecordedEvent>(null);
- try (EventStream es = EventStream.openFile(p)) {
- es.setReuse(true);
- es.onEvent(e -> {
- if (event.get() == null) {
- event.set(e);
- } else {
- if (e != event.get()) {
- fail.set(true);
+ for (boolean ordered : BOOLEAN_STATES) {
+ AtomicBoolean success = new AtomicBoolean(false);
+ Map<RecordedEvent, RecordedEvent> events = new IdentityHashMap<>();
+ try (EventStream es = EventStream.openFile(p)) {
+ es.setOrdered(ordered);
+ es.setReuse(true);
+ es.onEvent(e -> {
+ if(events.containsKey(e)) {
+ success.set(true);;
es.close();
}
- }
- });
- es.start();
+ events.put(e,e);
+ });
+ es.start();
+ }
+ if (!success.get()) {
+ throw new Exception("No reuse! Ordered = " + ordered);
+ }
}
- if (fail.get()) {
- throw new Exception("No reuse!");
- }
+
}
private static Path makeRecording() throws IOException {
try (Recording r = new Recording()) {
r.start();
- for (int i = 0; i < 1_000; i++) {
+ for (int i = 0; i < 5; i++) {
+ ReuseEvent e = new ReuseEvent();
+ e.commit();
+ }
+ Recording rotation = new Recording();
+ rotation.start();
+ for (int i = 0; i < 5; i++) {
ReuseEvent e = new ReuseEvent();
e.commit();
}
r.stop();
+ rotation.close();
Path p = Files.createTempFile("recording", ".jfr");
r.dump(p);
return p;