# HG changeset patch # User egahlin # Date 1561623572 -7200 # Node ID 83e4343a69846e8a57f5a03a23f5b4e2992dfe9f # Parent ba454a26d2c19c7a59454a0061fa9ba7c3b0ae71 Clean up and fix parser level filtering diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Wed Jun 26 16:04:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu Jun 27 10:19:32 2019 +0200 @@ -367,17 +367,19 @@ parsers.forEach(p -> { if (p instanceof EventParser) { EventParser ep = (EventParser) p; + String name = ep.getEventType().getName(); ep.setOrdered(ordered); ep.setReuse(reuse); if (resetEventCache) { ep.resetCache(); } - long threshold = eventFilter.getThreshold(ep.getEventType().getName()); + long threshold = eventFilter.getThreshold(name); if (threshold >= 0) { ep.setEnabled(true); ep.setThreshold(timeConverter.convertDurationNanos(threshold)); } else { - ep.setThreshold(-1L); + ep.setEnabled(false); + ep.setThreshold(Long.MAX_VALUE); } } }); diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jun 26 16:04:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Jun 27 10:19:32 2019 +0200 @@ -26,33 +26,18 @@ package jdk.jfr.consumer; import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; import java.nio.file.Path; import java.security.AccessControlContext; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.function.Consumer; -import jdk.jfr.internal.LogLevel; -import jdk.jfr.internal.LogTag; -import jdk.jfr.internal.Logger; -import jdk.jfr.internal.Repository; -import jdk.jfr.internal.consumer.ChunkHeader; import jdk.jfr.internal.consumer.EventConsumer; import jdk.jfr.internal.consumer.RecordingInput; +import jdk.jfr.internal.consumer.RepositoryFiles; /** * Implementation of an {@code EventStream}} that operates against a directory @@ -61,112 +46,6 @@ */ final class EventDirectoryStream implements EventStream { - private static final class RepositoryFiles { - private final Path repostory; - private final SortedMap pathSet = new TreeMap<>(); - private final Map pathLookup = new HashMap<>(); - private volatile boolean closed; - - public RepositoryFiles(Path repostory) { - this.repostory = repostory; - } - - long getTimestamp(Path p) { - return pathLookup.get(p); - } - - Path nextPath(long startTimeNanos) { - while (!closed) { - SortedMap after = pathSet.tailMap(startTimeNanos); - if (!after.isEmpty()) { - Path path = after.get(after.firstKey()); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos); - return path; - } - try { - if (updatePaths(repostory)) { - continue; - } - } catch (IOException e) { - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage()); - // This can happen if a chunk is being removed - // between the file was discovered and an instance - // of an EventSet was constructed. Just ignore, - // and retry later. - } - try { - synchronized (pathSet) { - pathSet.wait(1000); - } - } catch (InterruptedException e) { - // ignore - } - } - return null; - } - - private boolean updatePaths(Path repo) throws IOException { - if (repo == null) { - repo = Repository.getRepository().getRepositoryPath().toPath(); - } - boolean foundNew = false; - List added = new ArrayList<>(); - Set current = new HashSet<>(); - if (!Files.exists(repo)) { - // Repository removed, probably due to shutdown - return true; - } - try (DirectoryStream dirStream = Files.newDirectoryStream(repo, "*.jfr")) { - for (Path p : dirStream) { - if (!pathLookup.containsKey(p)) { - added.add(p); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath()); - } - current.add(p); - } - } - List removed = new ArrayList<>(); - for (Path p : pathLookup.keySet()) { - if (!current.contains(p)) { - removed.add(p); - } - } - - for (Path remove : removed) { - Long time = pathLookup.get(remove); - pathSet.remove(time); - pathLookup.remove(remove); - } - Collections.sort(added, (p1, p2) -> p1.compareTo(p2)); - for (Path p : added) { - // Only add files that have a complete header - // as the JVM may be in progress writing the file - long size = Files.size(p); - if (size >= ChunkHeader.HEADER_SIZE) { - long startNanos = readStartTime(p); - pathSet.put(startNanos, p); - pathLookup.put(p, startNanos); - foundNew = true; - } - } - return foundNew; - } - - private long readStartTime(Path p) throws IOException { - try (RecordingInput in = new RecordingInput(p.toFile(), 100)) { - ChunkHeader c = new ChunkHeader(in); - return c.getStartNanos(); - } - } - - public void close() { - synchronized (pathSet) { - this.closed = true; - pathSet.notify(); - } - } - } - static final class ParserConsumer extends EventConsumer { private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); @@ -260,92 +139,12 @@ return awaitNewEvents; } - public void setReuse(boolean reuse) { - this.reuse = reuse; - } - - public void setOrdered(boolean ordered) { - this.ordered = ordered; - } - @Override public void close() { repositoryFiles.close(); } } - static final class SharedParserConsumer extends EventConsumer { - private EventSetLocation location; - private EventSet eventSet; - private int eventSetIndex; - private int eventArrayIndex; - private RecordedEvent[] currentEventArray = new RecordedEvent[0]; - - public SharedParserConsumer(AccessControlContext acc) throws IOException { - super(acc); - } - - public void process() throws IOException { - this.location = EventSetLocation.current(); - this.eventSet = location.acquire(startNanos, null); // use timestamp - // from - if (eventSet == null) { - return; - } - while (!isClosed()) { - processSegment(); - runFlushActions(); - do { - if (isClosed()) { - return; - } - currentEventArray = eventSet.readEvents(eventSetIndex); - if (currentEventArray == EventSet.END_OF_SET) { - eventSet = eventSet.next(eventFilter); - if (eventSet == null || isClosed()) { - return; - } - eventSetIndex = 0; - continue; - } - if (currentEventArray == null) { - return; // no more events - } - eventSetIndex++; - } while (currentEventArray.length == 0); - eventArrayIndex = 0; - } - } - - private void processSegment() { - while (eventArrayIndex < currentEventArray.length) { - RecordedEvent e = currentEventArray[eventArrayIndex++]; - if (e == null) { - return; - } - dispatch(e); - } - } - - public void close() { - setClosed(true); - // TODO: Data races here, must fix - synchronized (this) { - if (eventSet != null) { - eventSet.release(null); - } - if (location != null) { - location.release(); - } - } - runCloseActions(); - } - - public void setReuse(boolean reuse) { - // ignore hint - } - } - private final EventConsumer eventConsumer; public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException { diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jun 26 16:04:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Jun 27 10:19:32 2019 +0200 @@ -82,7 +82,6 @@ } } - private void processOrdered() throws IOException { if (sortedList == null) { sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; @@ -188,6 +187,7 @@ eventConsumer.start(0); } + @Override public void setReuse(boolean reuse) { eventConsumer.setReuse(reuse); } diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Wed Jun 26 16:04:47 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,241 +0,0 @@ -/* - * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.jfr.consumer; - -import java.io.IOException; -import java.nio.file.Path; -import java.time.Instant; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - -import jdk.jfr.internal.consumer.ChunkHeader; -import jdk.jfr.internal.consumer.InternalEventFilter; -import jdk.jfr.internal.consumer.RecordingInput; - -/** - * Cache that represents all discovered events in a chunk. - * - */ -final class EventSet { - - public static final RecordedEvent[] END_OF_SET = new RecordedEvent[0]; - private static final AtomicInteger idCounter = new AtomicInteger(-1); - - private volatile Object[][] segments = new Object[1000][]; - private volatile boolean closed; - private final long startTimeNanos; - private final EventSetLocation location; - private final Path path; - private final int id; - - // Guarded by lock - private boolean awaitNewEvents; - private RecordingInput input; - private ChunkParser chunkParser; - private int referenceCount; - private final ReentrantLock lock = new ReentrantLock(); - private final Set filters = new HashSet<>(); - private InternalEventFilter globalFilter = InternalEventFilter.ACCEPT_ALL; - private boolean dirtyFilter = true; - - public void release(InternalEventFilter eventFilter) { - try { - lock.lock(); - filters.remove(eventFilter); - updateGlobalFilter(); - referenceCount--; - if (referenceCount == 0) { - closed = true; - if (input != null) { - try { - input.close(); - } catch (IOException e) { - // TODO: Flie locked by other process? - } - chunkParser = null; - input = null; - } - } - } finally { - lock.unlock(); - } - } - - public EventSet(EventSetLocation location, EventSet previousEventSet, Path p) throws IOException { - this.location = location; - this.path = p; - this.startTimeNanos = readStartTime(p); - this.id = idCounter.incrementAndGet(); - } - - private long readStartTime(Path p) throws IOException { - try (RecordingInput in = new RecordingInput(p.toFile(), 100)) { - ChunkHeader c = new ChunkHeader(in); - return c.getStartNanos(); - } - } - - Path getPath() { - return path; - } - - // TODO: Use binary search, must use lock - public int findIndex(Instant timestamp) { - int index = 0; - for (int i = 0; i < segments.length; i++) { - RecordedEvent[] events = (RecordedEvent[]) segments[i]; - if (events == null || events.length == 0) { - return Math.max(index - 1, 0); - } - RecordedEvent e = events[0]; // May not be sorted. - if (timestamp.isAfter(e.getEndTime())) { - return Math.max(index - 1, 0); - } - } - return segments.length; - } - - public void addFilter(InternalEventFilter filter) { - try { - lock.lock(); - filters.add(filter); - updateGlobalFilter(); - } finally { - lock.unlock(); - } - } - - // held with lock - private void updateGlobalFilter() { - globalFilter = InternalEventFilter.merge(filters); - dirtyFilter = true; - } - - public RecordedEvent[] readEvents(int index) throws IOException { - while (!closed) { - - RecordedEvent[] events = (RecordedEvent[]) segments[index]; - if (events != null) { - return events; - } - if (await()) { - try { - addSegment(index); - } finally { - lock.unlock(); - } - } - } - return null; - } - - private boolean await() { - try { - return lock.tryLock(250, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - return false; - } - } - - // held with lock - private void addSegment(int index) throws IOException { - if (chunkParser == null) { - chunkParser = new ChunkParser(new RecordingInput(path.toFile()), false); - } - if (dirtyFilter) { - chunkParser.setParserFilter(globalFilter); - } - if (segments[index] != null) { - return; - } - if (index == segments.length - 2) { - segments = Arrays.copyOf(segments, segments.length * 2); - } - RecordedEvent[] segment = new RecordedEvent[10]; - int i = 0; - while (true) { - RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); - if (e == null) { - // wait for new event with next call to readStreamingEvent() - awaitNewEvents = true; - break; - } - awaitNewEvents = false; - if (i == segment.length) { - segment = Arrays.copyOf(segment, segment.length * 2); - } - segment[i++] = e; - } - - // no events found - if (i == 0) { - if (chunkParser.isChunkFinished()) { - segments[index] = END_OF_SET; - return; - } - } - // at least 2 events, sort them - if (i > 1) { - Arrays.sort(segment, 0, i, (e1, e2) -> Long.compare(e1.endTime, e2.endTime)); - } - segments[index] = segment; - if (chunkParser.isChunkFinished()) { - segments[index + 1] = END_OF_SET; - } - } - - public long getStartTimeNanos() { - return startTimeNanos; - } - - public EventSet next(InternalEventFilter filter) throws IOException { - EventSet next = location.acquire(startTimeNanos + 1, this); - if (next == null) { - // closed - return null; - } - next.addFilter(filter); - release(filter); - return next; - } - - public void acquire() { - try { - lock.lock(); - referenceCount++; - } finally { - lock.unlock(); - } - } - - public String toString() { - return "Chunk:" + id + " (" + path + ")"; - } -} diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSetLocation.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSetLocation.java Wed Jun 26 16:04:47 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,183 +0,0 @@ -/* - * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.jfr.consumer; - -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; - -import jdk.jfr.internal.LogLevel; -import jdk.jfr.internal.LogTag; -import jdk.jfr.internal.Logger; -import jdk.jfr.internal.Repository; -import jdk.jfr.internal.consumer.ChunkHeader; - -/** - * This class corresponds to a disk repository. - *

- * Main purpose is to act as a cache if multiple {@code EventStream} want to - * access the same repository. An {@code EventSetLocation} should be released - * when it is no longer being used. - * - */ -final class EventSetLocation { - private static Map locations = new HashMap<>(); - - private final SortedMap eventSets = new TreeMap<>(); - private final Map lastPaths = new HashMap<>(); - - final Path path; - private int count = 0; - private volatile boolean closed; - - private EventSetLocation(Path path) { - this.path = path; - } - - public static EventSetLocation get(Path absolutPath) { - synchronized (locations) { - EventSetLocation esl = locations.get(absolutPath); - if (esl == null) { - esl = new EventSetLocation(absolutPath); - locations.put(absolutPath, esl); - } - esl.count++; - return esl; - } - } - - public static EventSetLocation current() throws IOException { - Repository.getRepository().ensureRepository(); - return get(Repository.getRepository().getRepositoryPath().toPath()); - } - - public void release() { - synchronized (locations) { - count--; - if (count == 0) { - locations.remove(path); - closed = true; - } - } - } - - public synchronized EventSet acquire(long startTimeNanos, EventSet previousEventSet) { - synchronized (eventSets) { - while (!closed) { - SortedMap after = eventSets.tailMap(startTimeNanos); - if (!after.isEmpty()) { - EventSet es = after.get(after.firstKey()); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Acquired " + startTimeNanos + ", got " + es); - es.acquire(); - return es; - } - try { - if (updateEventSets(previousEventSet)) { - continue; - } - } catch (IOException e) { - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during event set update " + e.getMessage()); - // This can happen if a chunk is being removed - // between the file was discovered and an instance - // of an EventSet was constructed. Just ignore, - // and retry later. - } - try { - eventSets.wait(1000); - } catch (InterruptedException e) { - // ignore - } - } - } - return null; - } - - private boolean updateEventSets(EventSet previousEventSet) throws IOException { - boolean foundNew = false; - List added = new ArrayList<>(); - Set current = new HashSet<>(); - if (!Files.exists(path)) { - // Repository removed, probably due to shutdown - closed = true; - return true; - } - try (DirectoryStream dirStream = Files.newDirectoryStream(path, "*.jfr")) { - for (Path p : dirStream) { - if (!lastPaths.containsKey(p)) { - added.add(p); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath()); - } - current.add(p); - } - } - List removed = new ArrayList<>(); - for (Path p : lastPaths.keySet()) { - if (!current.contains(p)) { - removed.add(p); - } - } - - for (Path remove : removed) { - Long time = lastPaths.get(remove); - eventSets.remove(time); - lastPaths.remove(remove); - } - Collections.sort(added, (p1,p2) -> p1.compareTo(p2)); - for (Path p : added) { - // Only add files that have a complete header - // as the JVM may be in progress writing the file - long size = Files.size(p); - if (size >= ChunkHeader.HEADER_SIZE) { - EventSet es = new EventSet(this, previousEventSet, p); - long startTime = es.getStartTimeNanos(); - if (startTime == 0) { - String errorMsg = "Chunk header should always contain a valid start time"; - throw new InternalError(errorMsg); - } - EventSet previous = eventSets.get(startTime); - if (previous != null) { - String errorMsg = "Found chunk " + p + " with the same start time " + startTime + " as previous chunk " + previous.getPath(); - throw new InternalError(errorMsg); - } - eventSets.put(startTime, es); - lastPaths.put(p, startTime); - previousEventSet = es; - foundNew = true; - } - } - return foundNew; - } -} diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Wed Jun 26 16:04:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Thu Jun 27 10:19:32 2019 +0200 @@ -110,7 +110,7 @@ protected boolean reuse = true; - protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; + protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; private final AccessControlContext accessControlContext; private boolean started; @@ -184,6 +184,7 @@ } if (removeConsumer) { EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); + eventFilter = buildFilter(array); consumersHandle.setVolatile(this, array); dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset // dispatch @@ -221,8 +222,22 @@ add(new EventDispatcher(eventName, action)); } + InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { + InternalEventFilter ef = new InternalEventFilter(); + for (EventDispatcher ed : dispatchers) { + String name = ed.eventName; + if (name == null) { + return InternalEventFilter.ACCEPT_ALL; + } + ef.setThreshold(name, 0); + } + return ef.threadSafe(); + } + private synchronized void add(EventDispatcher e) { - consumersHandle.setVolatile(this, merge(consumers, e)); + EventDispatcher[] dispatchers = merge(consumers,e); + eventFilter = buildFilter(dispatchers); + consumersHandle.setVolatile(this, dispatchers); dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset } diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java Wed Jun 26 16:04:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java Thu Jun 27 10:19:32 2019 +0200 @@ -25,49 +25,29 @@ package jdk.jfr.internal.consumer; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; +import java.util.StringJoiner; public final class InternalEventFilter { - public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter(); - private final Map thresholds = new HashMap<>(); - private boolean acceptAll; - - public static InternalEventFilter merge(Collection filters) { - for (InternalEventFilter ef : filters) { - if (ef.getAcceptAll()) { - return ACCEPT_ALL; - } - } - if (filters.size() == 1) { - return filters.iterator().next(); - } + public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter(true, Map.of()); - Set eventNames = new HashSet<>(); - for (InternalEventFilter ef : filters) { - eventNames.addAll(ef.thresholds.keySet()); - } - InternalEventFilter result = new InternalEventFilter(); - for (String eventName : eventNames) { - for (InternalEventFilter ef : filters) { - Long l = ef.thresholds.get(eventName); - if (l != null) { - result.setThreshold(eventName, l.longValue()); - } - } - } - return result; + private final Map thresholds; + private final boolean acceptAll; + + public InternalEventFilter() { + this(false, new HashMap<>()); } - private boolean getAcceptAll() { - return acceptAll; + // returns an instance that can be passed to + // another thread safely + public InternalEventFilter threadSafe() { + return new InternalEventFilter(acceptAll, thresholds); } - public void setAcceptAll() { - acceptAll = true; + private InternalEventFilter(boolean acceptAll, Map thresholds) { + this.acceptAll = acceptAll; + this.thresholds = thresholds; } public void setThreshold(String eventName, long nanos) { @@ -82,7 +62,7 @@ public long getThreshold(String eventName) { if (acceptAll) { - return 0; + return 0L; } Long l = thresholds.get(eventName); if (l != null) { @@ -90,17 +70,16 @@ } return -1; } + public String toString() { if (acceptAll) { return "ACCEPT ALL"; } - StringBuilder sb = new StringBuilder(); + + StringJoiner sb = new StringJoiner(", "); for (String key : thresholds.keySet().toArray(new String[0])) { Long value = thresholds.get(key); - sb.append(key); - sb.append(" = "); - sb.append(value.longValue() / 1_000_000); - sb.append(" ms"); + sb.add(key + " = " + value); } return sb.toString(); } diff -r ba454a26d2c1 -r 83e4343a6984 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Thu Jun 27 10:19:32 2019 +0200 @@ -0,0 +1,126 @@ +package jdk.jfr.internal.consumer; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; +import jdk.jfr.internal.Repository; + +public final class RepositoryFiles { + private final Path repostory; + private final SortedMap pathSet = new TreeMap<>(); + private final Map pathLookup = new HashMap<>(); + private volatile boolean closed; + + public RepositoryFiles(Path repostory) { + this.repostory = repostory; + } + + public long getTimestamp(Path p) { + return pathLookup.get(p); + } + + public Path nextPath(long startTimeNanos) { + while (!closed) { + SortedMap after = pathSet.tailMap(startTimeNanos); + if (!after.isEmpty()) { + Path path = after.get(after.firstKey()); + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos); + return path; + } + try { + if (updatePaths(repostory)) { + continue; + } + } catch (IOException e) { + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage()); + // This can happen if a chunk is being removed + // between the file was discovered and an instance + // of an EventSet was constructed. Just ignore, + // and retry later. + } + try { + synchronized (pathSet) { + pathSet.wait(1000); + } + } catch (InterruptedException e) { + // ignore + } + } + return null; + } + + private boolean updatePaths(Path repo) throws IOException { + if (repo == null) { + repo = Repository.getRepository().getRepositoryPath().toPath(); + } + boolean foundNew = false; + List added = new ArrayList<>(); + Set current = new HashSet<>(); + if (!Files.exists(repo)) { + // Repository removed, probably due to shutdown + return true; + } + try (DirectoryStream dirStream = Files.newDirectoryStream(repo, "*.jfr")) { + for (Path p : dirStream) { + if (!pathLookup.containsKey(p)) { + added.add(p); + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath()); + } + current.add(p); + } + } + List removed = new ArrayList<>(); + for (Path p : pathLookup.keySet()) { + if (!current.contains(p)) { + removed.add(p); + } + } + + for (Path remove : removed) { + Long time = pathLookup.get(remove); + pathSet.remove(time); + pathLookup.remove(remove); + } + Collections.sort(added, (p1, p2) -> p1.compareTo(p2)); + for (Path p : added) { + // Only add files that have a complete header + // as the JVM may be in progress writing the file + long size = Files.size(p); + if (size >= ChunkHeader.HEADER_SIZE) { + long startNanos = readStartTime(p); + pathSet.put(startNanos, p); + pathLookup.put(p, startNanos); + foundNew = true; + } + } + return foundNew; + } + + private long readStartTime(Path p) throws IOException { + try (RecordingInput in = new RecordingInput(p.toFile(), 100)) { + ChunkHeader c = new ChunkHeader(in); + return c.getStartNanos(); + } + } + + public void close() { + synchronized (pathSet) { + this.closed = true; + pathSet.notify(); + } + } +} \ No newline at end of file