# HG changeset patch # User egahlin # Date 1559250290 -7200 # Node ID 6a7e7743b82fcecc9ac0f275b93508283d680762 # Parent 2e35a025ebee282a0cb1e1896c31713ee6029f17 setOrdered and setReuse implemented for file stream, incl. unit tests diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu May 30 23:04:50 2019 +0200 @@ -62,16 +62,14 @@ private boolean chunkFinished; private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; private boolean reuse; + private boolean ordered; + private boolean resetEventCache; public ChunkParser(RecordingInput input, boolean reuse) throws IOException { this(new ChunkHeader(input), null, 500); this.reuse = reuse; } - public void setReuse(boolean resue) { - this.reuse = resue; - updateParsers(); - } private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException { this.input = header.getInput(); @@ -95,7 +93,7 @@ parsers = previous.parsers; typeMap = previous.typeMap; } - updateParsers(); + updateEventParsers(); constantLookups.forEach(c -> c.newPool()); fillConstantPools(0); constantLookups.forEach(c -> c.getLatestPool().setResolving()); @@ -116,23 +114,7 @@ return this.eventFilter; } - private void updateParsers() { - parsers.forEach(p -> { - if (p instanceof EventParser) { - EventParser ep = (EventParser) p; - if (reuse) { - ep.setReuse(true); - } - long threshold = eventFilter.getThreshold(ep.getEventType().getName()); - if (threshold >= 0) { - ep.setEnabled(true); - ep.setThreshold(timeConverter.convertDurationNanos(threshold)); - } else { - ep.setThreshold(-1L); - } - } - }); - } + /** * Reads an event and returns null when segment or chunk ends. @@ -163,7 +145,7 @@ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); parsers = factory.getParsers(); typeMap = factory.getTypeMap(); - updateParsers(); + updateEventParsers(); } if (contantPosition != chunkHeader.getConstantPoolPosition()) { Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); @@ -356,4 +338,43 @@ public boolean isChunkFinished() { return chunkFinished; } + + // Need to call updateEventParsers() for + // change to take effect + public void setReuse(boolean resue) { + this.reuse = resue; + } + + // Need to call updateEventParsers() for + // change to take effect + public void setOrdered(boolean ordered) { + this.ordered = ordered; + } + + // Need to call updateEventParsers() for + // change to take effect + public void resetEventCache() { + this.resetEventCache = true; + } + + public void updateEventParsers() { + parsers.forEach(p -> { + if (p instanceof EventParser) { + EventParser ep = (EventParser) p; + ep.setOrdered(ordered); + ep.setReuse(reuse); + if (resetEventCache) { + ep.resetCache(); + } + long threshold = eventFilter.getThreshold(ep.getEventType().getName()); + if (threshold >= 0) { + ep.setEnabled(true); + ep.setThreshold(timeConverter.convertDurationNanos(threshold)); + } else { + ep.setThreshold(-1L); + } + } + }); + resetEventCache = false; + } } diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu May 30 23:04:50 2019 +0200 @@ -181,4 +181,11 @@ public void setReuse(boolean reuse) { eventConsumer.setReuse(reuse); } + + @Override + public void setOrdered(boolean ordered) { + if (ordered == false) { + throw new UnsupportedOperationException("Unordered not implemented yet"); + } + } } diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu May 30 23:04:50 2019 +0200 @@ -30,6 +30,8 @@ import java.security.AccessControlContext; import java.security.AccessController; import java.time.Duration; +import java.util.Arrays; +import java.util.Comparator; import java.util.Objects; import java.util.function.Consumer; @@ -43,9 +45,14 @@ final class EventFileStream implements EventStream { private final static class FileEventConsumer extends EventConsumer { + private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); + private static final int DEFAULT_ARRAY_SIZE = 100_000; private final RecordingInput input; private ChunkParser chunkParser; private boolean reuse = true; + private RecordedEvent[] sortedList; + private boolean ordered; + private boolean finished; public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException { super(acc); @@ -55,13 +62,59 @@ @Override public void process() throws Exception { chunkParser = new ChunkParser(input, reuse); - chunkParser.setReuse(reuse); + while (!isClosed() && !finished) { + boolean reuse = this.reuse; + boolean ordered = this.ordered; + + chunkParser.setReuse(reuse); + chunkParser.setOrdered(ordered); + chunkParser.resetEventCache(); + chunkParser.updateEventParsers(); + + if (ordered) { + processOrdered(); + } else { + processUnordered(); + } + } + } + + private void processOrdered() throws IOException { + if (sortedList == null) { + sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; + } RecordedEvent event; + int index = 0; while (true) { event = chunkParser.readEvent(); if (event == null) { + Arrays.sort(sortedList, 0, index, END_TIME); + for (int i = 0; i < index; i++) { + dispatch(sortedList[i]); + } + event = findNext(); + if (event == null) { + finished = true; + return; + } + } + if (index == sortedList.length) { + RecordedEvent[] tmp = sortedList; + sortedList = new RecordedEvent[2 * tmp.length]; + System.arraycopy(tmp, 0, sortedList, 0, tmp.length); + } + sortedList[index++] = event; + } + } + + private void processUnordered() throws IOException { + RecordedEvent event; + while (!isClosed()) { + event = chunkParser.readEvent(); + if (event == null) { event = findNext(); if (event == null) { + finished = true; return; } } @@ -82,11 +135,11 @@ } public void setReuse(boolean reuse) { - if (chunkParser == null) { - this.reuse = reuse; - } else { - chunkParser.setReuse(reuse); - } + this.reuse = reuse; + } + + public void setOrdered(boolean ordered) { + this.ordered = ordered; } } @@ -165,4 +218,9 @@ public void awaitTermination() { eventConsumer.awaitTermination(); } + + @Override + public void setOrdered(boolean ordered) { + eventConsumer.setOrdered(ordered); + } } diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Thu May 30 23:04:50 2019 +0200 @@ -46,10 +46,11 @@ private final boolean hasDuration; private final List valueDescriptors; private final int startIndex; - private final RecordedEvent event; private long thresholdTicks = -1; private boolean enabled = true; - private boolean reuse; + private RecordedEvent[] eventCache; + private int index; + private boolean ordered; EventParser(TimeConverter timeConverter, EventType type, Parser[] parsers) { this.timeConverter = timeConverter; @@ -58,7 +59,23 @@ this.hasDuration = type.getField(FIELD_DURATION) != null; this.startIndex = hasDuration ? 2 : 1; this.valueDescriptors = type.getFields(); - this.event = new RecordedEvent(type, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter); + } + + private RecordedEvent cachedEvent() { + if (index == eventCache.length) { + RecordedEvent[] cache = eventCache; + eventCache = new RecordedEvent[eventCache.length * 2]; + System.arraycopy(cache, 0, eventCache, 0, cache.length); + } + RecordedEvent event = eventCache[index]; + if (event == null) { + event = new RecordedEvent(eventType, valueDescriptors, new Object[parsers.length], 0L, 0L, timeConverter); + eventCache[index] = event; + } + if (ordered) { + index++; + } + return event; } public EventType getEventType() { @@ -87,7 +104,8 @@ return null; } } - if (reuse) { + if (eventCache != null) { + RecordedEvent event = cachedEvent(); Object[] values = event.objects; for (int i = startIndex; i < parsers.length; i++) { values[i] = parsers[i].parse(input); @@ -124,7 +142,7 @@ } } } - return event; + return null; } @Override @@ -132,8 +150,27 @@ throw new InternalError("Should not call this method. More efficent to read event size and skip ahead"); } - public void setReuse(boolean reuse) { - this.reuse = reuse; + public void resetCache() { + index = 0; + } + + public boolean hasReuse() { + return eventCache != null; } + public void setReuse(boolean reuse) { + if (reuse == hasReuse()) { + return; + } + if (reuse) { + eventCache = new RecordedEvent[2]; + index = 0; + } else { + eventCache = null; + } + } + + public void setOrdered(boolean ordered) { + this.ordered = ordered; + } } diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu May 30 23:04:50 2019 +0200 @@ -139,19 +139,32 @@ boolean remove(Object action); /** - * Hint that the the event object in an {@link #onEvent(Consumer)} action + * Hint that the event object in an {@link #onEvent(Consumer)} action * may be reused. *

- * If reuse is set to - * {@code true), a callback should not keep a reference to the event object - * after the callback from {@code onEvent} has returned. + * If reuse is set to {@code true), a callback should not keep a reference + * to the event object after the callback from {@code onEvent} has returned. *

- * By default reuse is {@code true} + * By default reuse is set to {@code true} + * + * @param resuse if event objects can be reused between calls to {@code #onEvent(Consumer)} * */ public void setReuse(boolean reuse); /** + * Orders events in chronological order aft the end timestamp + * + * TODO: WHAT ABOUT EVENTS THAT OCCUR WAY OUT OF ORDER + *

+ * By default ordered is set to {@code true} + * + * @param ordered if event objects arrive in chronological order to {@code #onEvent(Consumer)} + */ + public void setOrdered(boolean ordered); + + + /** * Starts processing events in the stream. *

* All actions will performed on this stream will happen in the current diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu May 30 23:04:50 2019 +0200 @@ -299,6 +299,11 @@ @Override public void setReuse(boolean reuse) { - // hint is ignored + throw new UnsupportedOperationException("Unordered not implemented yet"); + } + + @Override + public void setOrdered(boolean ordered) { + stream.setOrdered(ordered); } } diff -r 2e35a025ebee -r 6a7e7743b82f src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Mon May 27 23:05:54 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Thu May 30 23:04:50 2019 +0200 @@ -131,9 +131,9 @@ jvm.exclude(Thread.currentThread()); try { process(); - } catch (Throwable e) { - e.printStackTrace(); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer."); + } catch (Exception e) { + e.printStackTrace(); // for debugging purposes, remove before integration + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); } finally { Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); } @@ -185,7 +185,12 @@ dispatcher.put(e.getEventType().getId(), consumerDispatch); } for (int i = 0; i < consumerDispatch.length; i++) { - consumerDispatch[i].offer(e); + try { + consumerDispatch[i].offer(e); + } catch (Exception exception) { + // Is this a reasonable behavior for an exception? + // Error will abort the stream. + } } } diff -r 2e35a025ebee -r 6a7e7743b82f test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Thu May 30 23:04:50 2019 +0200 @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.api.consumer.filestream; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import jdk.jfr.Event; +import jdk.jfr.Recording; +import jdk.jfr.consumer.EventStream; + +/** + * @test + * @summary Test EventStream::setOrdered(...) + * @key jfr + * @requires vm.hasJFR + * @library /test/lib + * @run main/othervm jdk.jfr.api.consumer.filestream.TestOrdered + */ +public class TestOrdered { + + static class OrderedEvent extends Event { + } + + static class Emitter extends Thread { + private final CyclicBarrier barrier; + + public Emitter(CyclicBarrier barrier) { + this.barrier = barrier; + } + + @Override + public void run() { + OrderedEvent e1 = new OrderedEvent(); + e1.commit(); + try { + barrier.await(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Unexpected exception in barrier"); + } + OrderedEvent e2 = new OrderedEvent(); + e2.commit(); + } + } + + private static final int THREAD_COUNT = 4; + + public static void main(String... args) throws Exception { + Path p = makeUnorderedRecording(); + + testSetOrderedTrue(p); + testSetOrderedFalse(p); + } + + private static void testSetOrderedTrue(Path p) throws Exception { + AtomicReference timestamp = new AtomicReference<>(Instant.MIN); + try (EventStream es = EventStream.openFile(p)) { + es.setOrdered(true); + es.onEvent(e -> { + Instant endTime = e.getEndTime(); + if (endTime.isBefore(timestamp.get())) { + throw new Error("Events are not ordered!"); + } + timestamp.set(endTime); + }); + es.start(); + } + } + + private static void testSetOrderedFalse(Path p) throws Exception { + AtomicReference timestamp = new AtomicReference<>(Instant.MIN); + AtomicBoolean unoreded = new AtomicBoolean(false); + try (EventStream es = EventStream.openFile(p)) { + es.setOrdered(false); + es.onEvent(e -> { + Instant endTime = e.getEndTime(); + if (endTime.isBefore(timestamp.get())) { + unoreded.set(true); + es.close(); + } + timestamp.set(endTime); + }); + es.start(); + if (!unoreded.get()) { + throw new Exception("Expected at least some events to be out of order"); + } + } + } + + private static Path makeUnorderedRecording() throws Exception { + CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT + 1); + + try (Recording r = new Recording()) { + r.start(); + List emitters = new ArrayList<>(); + for (int i = 0; i < THREAD_COUNT; i++) { + Emitter e = new Emitter(barrier); + e.start(); + emitters.add(e); + } + // Thread buffers should now have one event each + barrier.await(); + // Add another event to each thread buffer, so + // events are bound to come out of order when they + // are flushed + for (Emitter e : emitters) { + e.join(); + } + r.stop(); + Path p = Files.createTempFile("recording", ".jfr"); + r.dump(p); + return p; + } + } +} diff -r 2e35a025ebee -r 6a7e7743b82f test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java --- a/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Mon May 27 23:05:54 2019 +0200 +++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestReuse.java Thu May 30 23:04:50 2019 +0200 @@ -31,6 +31,7 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import jdk.jfr.Event; import jdk.jfr.Recording; @@ -67,34 +68,34 @@ fail.set(true); throw new Error("Unexpected reuse!"); } - identity.put(e,e); + identity.put(e, e); }); es.start(); } if (fail.get()) { - throw new Exception("Unexpected resued"); + throw new Exception("Unexpected reuse!"); } } private static void testSetReuseTrue(Path p) throws Exception { AtomicBoolean fail = new AtomicBoolean(false); + AtomicReference event = new AtomicReference(null); try (EventStream es = EventStream.openFile(p)) { es.setReuse(true); - RecordedEvent[] events = new RecordedEvent[1]; es.onEvent(e -> { - if (events[0] == null) { - events[0] = e; + if (event.get() == null) { + event.set(e); } else { - if (e != events[0]) { + if (e != event.get()) { fail.set(true); - throw new Error("No reuse"); + throw new Error("No reuse!"); } } }); es.start(); } if (fail.get()) { - throw new Exception("No reuse"); + throw new Exception("No reuse!"); } }