# HG changeset patch # User egahlin # Date 1561557887 -7200 # Node ID ba454a26d2c19c7a59454a0061fa9ba7c3b0ae71 # Parent ef3e241c420fbe3b21f378a9d9a6f959d88f309e SetOrdered and setReuse now work on a directory stream diff -r ef3e241c420f -r ba454a26d2c1 src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Tue Jun 25 14:02:06 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Wed Jun 26 16:04:47 2019 +0200 @@ -71,7 +71,7 @@ } public ChunkParser(ChunkParser previous) throws IOException { - this(new ChunkHeader(previous.input), null, 500); + this(new ChunkHeader(previous.input), previous, 500); } private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException { diff -r ef3e241c420f -r ba454a26d2c1 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue Jun 25 14:02:06 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jun 26 16:04:47 2019 +0200 @@ -54,6 +54,11 @@ import jdk.jfr.internal.consumer.EventConsumer; import jdk.jfr.internal.consumer.RecordingInput; +/** + * Implementation of an {@code EventStream}} that operates against a directory + * with chunk files. + * + */ final class EventDirectoryStream implements EventStream { private static final class RepositoryFiles { @@ -67,7 +72,7 @@ } long getTimestamp(Path p) { - return pathLookup.get(p); + return pathLookup.get(p); } Path nextPath(long startTimeNanos) { @@ -168,9 +173,7 @@ private static final int DEFAULT_ARRAY_SIZE = 10_000; private final RepositoryFiles repositoryFiles; private ChunkParser chunkParser; - private boolean reuse = true; private RecordedEvent[] sortedList; - private boolean ordered = true; public ParserConsumer(AccessControlContext acc, Path p) throws IOException { super(acc); @@ -200,6 +203,9 @@ } path = repositoryFiles.nextPath(startNanos); + if (path == null) { + return; // stream closed + } startNanos = repositoryFiles.getTimestamp(path) + 1; input.setFile(path); chunkParser = chunkParser.newChunkParser(); diff -r ef3e241c420f -r ba454a26d2c1 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue Jun 25 14:02:06 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jun 26 16:04:47 2019 +0200 @@ -53,7 +53,6 @@ private RecordedEvent[] sortedList; private boolean ordered; - public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException { super(acc); this.input = input; @@ -68,8 +67,8 @@ chunkParser.setReuse(reuse); chunkParser.setOrdered(ordered); chunkParser.resetEventCache(); + chunkParser.setParserFilter(eventFilter); chunkParser.updateEventParsers(); - if (ordered) { processOrdered(); } else { @@ -78,6 +77,7 @@ if (chunkParser.isLastChunk()) { return; } + runFlushActions(); chunkParser = chunkParser.nextChunkParser(); } } diff -r ef3e241c420f -r ba454a26d2c1 src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Tue Jun 25 14:02:06 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Wed Jun 26 16:04:47 2019 +0200 @@ -331,7 +331,7 @@ @Override public void setReuse(boolean reuse) { - throw new UnsupportedOperationException("Unordered not implemented yet"); + stream.setReuse(reuse); } @Override diff -r ef3e241c420f -r ba454a26d2c1 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Tue Jun 25 14:02:06 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Wed Jun 26 16:04:47 2019 +0200 @@ -78,6 +78,8 @@ private final static VarHandle dispatcherHandle; private final static VarHandle flushActionsHandle; private final static VarHandle closeActionsHandle; + private final static VarHandle orderedHandle; + private final static VarHandle reuseHandle; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); @@ -86,6 +88,9 @@ dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class); flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class); closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class); + orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class); + reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class); + } catch (ReflectiveOperationException e) { throw new InternalError(e); } @@ -101,6 +106,10 @@ // set by VarHandle private Runnable[] closeActions = new Runnable[0]; + protected boolean ordered = true; + + protected boolean reuse = true; + protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; private final AccessControlContext accessControlContext; @@ -329,11 +338,11 @@ abstract public void close(); public void setReuse(boolean reuse) { - + reuseHandle.setVolatile(this, reuse); } public void setOrdered(boolean ordered) { - + orderedHandle.setVolatile(this, ordered); } } \ No newline at end of file diff -r ef3e241c420f -r ba454a26d2c1 test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java --- a/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Tue Jun 25 14:02:06 2019 +0200 +++ b/test/jdk/jdk/jfr/api/consumer/filestream/TestOrdered.java Wed Jun 26 16:04:47 2019 +0200 @@ -92,7 +92,7 @@ es.onEvent(e -> { Instant endTime = e.getEndTime(); if (endTime.isBefore(timestamp.get())) { - throw new Error("Events are not ordered! Reues = " + reuse); + throw new Error("Events are not ordered! Reuse = " + reuse); } timestamp.set(endTime); }); @@ -118,7 +118,7 @@ }); es.start(); if (!unoreded.get()) { - throw new Exception("Expected at least some events to be out of order! Reues = " + reuse); + throw new Exception("Expected at least some events to be out of order! Reuse = " + reuse); } } }