# HG changeset patch # User egahlin # Date 1559328268 -7200 # Node ID 7d9d4f629f6e487124dd277ddcb9692556095846 # Parent ce265e404c64d612c35489e0aa37c70f4f3655b4 Make setReuse and setOrdered work across chunk boundaries. Improved unit tests diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 31 20:44:28 2019 +0200 @@ -82,6 +82,9 @@ this.constantLookups = previous.constantLookups; this.previousMetadata = previous.metadata; this.pollInterval = previous.pollInterval; + this.ordered = previous.ordered; + this.reuse = previous.reuse; + this.eventFilter = previous.eventFilter; } this.metadata = header.readMetadata(previousMetadata); this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset()); @@ -89,11 +92,11 @@ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); parsers = factory.getParsers(); typeMap = factory.getTypeMap(); + updateEventParsers(); } else { parsers = previous.parsers; typeMap = previous.typeMap; } - updateEventParsers(); constantLookups.forEach(c -> c.newPool()); fillConstantPools(0); constantLookups.forEach(c -> c.getLatestPool().setResolving()); diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 31 20:44:28 2019 +0200 @@ -47,7 +47,7 @@ super(acc); } - public void process() throws Exception, IOException { + public void process() throws IOException { this.location = EventSetLocation.current(); this.eventSet = location.acquire(startNanos, null); // use timestamp // from diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 31 20:44:28 2019 +0200 @@ -52,7 +52,7 @@ private boolean reuse = true; private RecordedEvent[] sortedList; private boolean ordered; - private boolean finished; + public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException { super(acc); @@ -60,12 +60,11 @@ } @Override - public void process() throws Exception { + public void process() throws IOException { chunkParser = new ChunkParser(input, reuse); - while (!isClosed() && !finished) { + while (!isClosed()) { boolean reuse = this.reuse; boolean ordered = this.ordered; - chunkParser.setReuse(reuse); chunkParser.setOrdered(ordered); chunkParser.resetEventCache(); @@ -76,9 +75,14 @@ } else { processUnordered(); } + if (chunkParser.isLastChunk()) { + return; + } + chunkParser = chunkParser.nextChunkParser(); } } + private void processOrdered() throws IOException { if (sortedList == null) { sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; @@ -92,11 +96,7 @@ for (int i = 0; i < index; i++) { dispatch(sortedList[i]); } - event = findNext(); - if (event == null) { - finished = true; - return; - } + return; } if (index == sortedList.length) { RecordedEvent[] tmp = sortedList; @@ -112,28 +112,12 @@ while (!isClosed()) { event = chunkParser.readEvent(); if (event == null) { - event = findNext(); - if (event == null) { - finished = true; - return; - } + return; } dispatch(event); } } - private RecordedEvent findNext() throws IOException { - RecordedEvent event = null; - while (event == null) { - if (chunkParser.isLastChunk()) { - return null; - } - chunkParser = chunkParser.nextChunkParser(); - event = chunkParser.readEvent(); - } - return event; - } - public void setReuse(boolean reuse) { this.reuse = reuse; } diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Fri May 31 20:44:28 2019 +0200 @@ -171,6 +171,10 @@ } public void setOrdered(boolean ordered) { + if (this.ordered == ordered) { + return; + } this.ordered = ordered; + this.index = 0; } } diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Fri May 31 20:44:28 2019 +0200 @@ -138,14 +138,14 @@ dirtyFilter = true; } - public RecordedEvent[] readEvents(int index) throws Exception { + public RecordedEvent[] readEvents(int index) throws IOException { while (!closed) { RecordedEvent[] events = (RecordedEvent[]) segments[index]; if (events != null) { return events; } - if (lock.tryLock(250, TimeUnit.MILLISECONDS)) { + if (await()) { try { addSegment(index); } finally { @@ -156,6 +156,14 @@ return null; } + private boolean await() { + try { + return lock.tryLock(250, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + // held with lock private void addSegment(int index) throws IOException { if (chunkParser == null) { diff -r ce265e404c64 -r 7d9d4f629f6e src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Thu May 30 23:12:44 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri May 31 20:44:28 2019 +0200 @@ -131,15 +131,24 @@ jvm.exclude(Thread.currentThread()); try { process(); + } catch (IOException e) { + if (!isClosed()) { + logException(e); + } } catch (Exception e) { - e.printStackTrace(); // for debugging purposes, remove before integration - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); + logException(e); } finally { Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); } } - public abstract void process() throws Exception; + private void logException(Exception e) { + e.printStackTrace(); // for debugging purposes, remove before + // integration + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); + } + + public abstract void process() throws IOException; public synchronized boolean remove(Object action) { boolean remove = false; diff -r ce265e404c64 -r 7d9d4f629f6e test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java --- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Thu May 30 23:12:44 2019 +0200 +++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Fri May 31 20:44:28 2019 +0200 @@ -74,6 +74,7 @@ } private static final int THREAD_COUNT = 4; + private static final boolean[] BOOLEAN_STATES = { false, true }; public static void main(String... args) throws Exception { Path p = makeUnorderedRecording(); @@ -83,36 +84,42 @@ } private static void testSetOrderedTrue(Path p) throws Exception { - AtomicReference timestamp = new AtomicReference<>(Instant.MIN); - try (EventStream es = EventStream.openFile(p)) { - es.setOrdered(true); - es.onEvent(e -> { - Instant endTime = e.getEndTime(); - if (endTime.isBefore(timestamp.get())) { - throw new Error("Events are not ordered!"); - } - timestamp.set(endTime); - }); - es.start(); + for (boolean reuse : BOOLEAN_STATES) { + AtomicReference timestamp = new AtomicReference<>(Instant.MIN); + try (EventStream es = EventStream.openFile(p)) { + es.setReuse(reuse); + es.setOrdered(true); + es.onEvent(e -> { + Instant endTime = e.getEndTime(); + if (endTime.isBefore(timestamp.get())) { + throw new Error("Events are not ordered! Reues = " + reuse); + } + timestamp.set(endTime); + }); + es.start(); + } } } private static void testSetOrderedFalse(Path p) throws Exception { - AtomicReference timestamp = new AtomicReference<>(Instant.MIN); - AtomicBoolean unoreded = new AtomicBoolean(false); - try (EventStream es = EventStream.openFile(p)) { - es.setOrdered(false); - es.onEvent(e -> { - Instant endTime = e.getEndTime(); - if (endTime.isBefore(timestamp.get())) { - unoreded.set(true); - es.close(); + for (boolean reuse : BOOLEAN_STATES) { + AtomicReference timestamp = new AtomicReference<>(Instant.MIN); + AtomicBoolean unoreded = new AtomicBoolean(false); + try (EventStream es = EventStream.openFile(p)) { + es.setReuse(reuse); + es.setOrdered(false); + es.onEvent(e -> { + Instant endTime = e.getEndTime(); + if (endTime.isBefore(timestamp.get())) { + unoreded.set(true); + es.close(); + } + timestamp.set(endTime); + }); + es.start(); + if (!unoreded.get()) { + throw new Exception("Expected at least some events to be out of order! Reues = " + reuse); } - timestamp.set(endTime); - }); - es.start(); - if (!unoreded.get()) { - throw new Exception("Expected at least some events to be out of order"); } } } diff -r ce265e404c64 -r 7d9d4f629f6e test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java --- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Thu May 30 23:12:44 2019 +0200 +++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Fri May 31 20:44:28 2019 +0200 @@ -31,7 +31,6 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import jdk.jfr.Event; import jdk.jfr.Recording; @@ -51,6 +50,8 @@ static class ReuseEvent extends Event { } + private static final boolean[] BOOLEAN_STATES = { false, true }; + public static void main(String... args) throws Exception { Path p = makeRecording(); @@ -59,54 +60,66 @@ } private static void testSetReuseFalse(Path p) throws Exception { - AtomicBoolean fail = new AtomicBoolean(false); - Map identity = new IdentityHashMap<>(); - try (EventStream es = EventStream.openFile(p)) { - es.setReuse(false); - es.onEvent(e -> { - if (identity.containsKey(e)) { - fail.set(true); - es.close(); - } - identity.put(e, e); - }); - es.start(); - } - if (fail.get()) { - throw new Exception("Unexpected reuse!"); + for (boolean ordered : BOOLEAN_STATES) { + AtomicBoolean fail = new AtomicBoolean(false); + Map identity = new IdentityHashMap<>(); + try (EventStream es = EventStream.openFile(p)) { + es.setOrdered(ordered); + es.setReuse(false); + es.onEvent(e -> { + if (identity.containsKey(e)) { + fail.set(true); + es.close(); + } + identity.put(e, e); + }); + es.start(); + } + if (fail.get()) { + throw new Exception("Unexpected reuse! Ordered = " + ordered); + } + } } private static void testSetReuseTrue(Path p) throws Exception { - AtomicBoolean fail = new AtomicBoolean(false); - AtomicReference event = new AtomicReference(null); - try (EventStream es = EventStream.openFile(p)) { - es.setReuse(true); - es.onEvent(e -> { - if (event.get() == null) { - event.set(e); - } else { - if (e != event.get()) { - fail.set(true); + for (boolean ordered : BOOLEAN_STATES) { + AtomicBoolean success = new AtomicBoolean(false); + Map events = new IdentityHashMap<>(); + try (EventStream es = EventStream.openFile(p)) { + es.setOrdered(ordered); + es.setReuse(true); + es.onEvent(e -> { + if(events.containsKey(e)) { + success.set(true);; es.close(); } - } - }); - es.start(); + events.put(e,e); + }); + es.start(); + } + if (!success.get()) { + throw new Exception("No reuse! Ordered = " + ordered); + } } - if (fail.get()) { - throw new Exception("No reuse!"); - } + } private static Path makeRecording() throws IOException { try (Recording r = new Recording()) { r.start(); - for (int i = 0; i < 1_000; i++) { + for (int i = 0; i < 5; i++) { + ReuseEvent e = new ReuseEvent(); + e.commit(); + } + Recording rotation = new Recording(); + rotation.start(); + for (int i = 0; i < 5; i++) { ReuseEvent e = new ReuseEvent(); e.commit(); } r.stop(); + rotation.close(); Path p = Files.createTempFile("recording", ".jfr"); r.dump(p); return p;