Clean up and fix parser level filtering JEP-349-branch
authoregahlin
Thu, 27 Jun 2019 10:19:32 +0200
branchJEP-349-branch
changeset 57433 83e4343a6984
parent 57432 ba454a26d2c1
child 57434 216bf2e3b542
Clean up and fix parser level filtering
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSetLocation.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Thu Jun 27 10:19:32 2019 +0200
@@ -367,17 +367,19 @@
         parsers.forEach(p -> {
             if (p instanceof EventParser) {
                 EventParser ep = (EventParser) p;
+                String name = ep.getEventType().getName();
                 ep.setOrdered(ordered);
                 ep.setReuse(reuse);
                 if (resetEventCache) {
                     ep.resetCache();
                 }
-                long threshold = eventFilter.getThreshold(ep.getEventType().getName());
+                long threshold = eventFilter.getThreshold(name);
                 if (threshold >= 0) {
                     ep.setEnabled(true);
                     ep.setThreshold(timeConverter.convertDurationNanos(threshold));
                 } else {
-                    ep.setThreshold(-1L);
+                    ep.setEnabled(false);
+                    ep.setThreshold(Long.MAX_VALUE);
                 }
             }
         });
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Jun 27 10:19:32 2019 +0200
@@ -26,33 +26,18 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.function.Consumer;
 
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.Repository;
-import jdk.jfr.internal.consumer.ChunkHeader;
 import jdk.jfr.internal.consumer.EventConsumer;
 import jdk.jfr.internal.consumer.RecordingInput;
+import jdk.jfr.internal.consumer.RepositoryFiles;
 
 /**
  * Implementation of an {@code EventStream}} that operates against a directory
@@ -61,112 +46,6 @@
  */
 final class EventDirectoryStream implements EventStream {
 
-    private static final class RepositoryFiles {
-        private final Path repostory;
-        private final SortedMap<Long, Path> pathSet = new TreeMap<>();
-        private final Map<Path, Long> pathLookup = new HashMap<>();
-        private volatile boolean closed;
-
-        public RepositoryFiles(Path repostory) {
-            this.repostory = repostory;
-        }
-
-        long getTimestamp(Path p) {
-            return pathLookup.get(p);
-        }
-
-        Path nextPath(long startTimeNanos) {
-            while (!closed) {
-                SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
-                if (!after.isEmpty()) {
-                    Path path = after.get(after.firstKey());
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
-                    return path;
-                }
-                try {
-                    if (updatePaths(repostory)) {
-                        continue;
-                    }
-                } catch (IOException e) {
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
-                    // This can happen if a chunk is being removed
-                    // between the file was discovered and an instance
-                    // of an EventSet was constructed. Just ignore,
-                    // and retry later.
-                }
-                try {
-                    synchronized (pathSet) {
-                        pathSet.wait(1000);
-                    }
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            return null;
-        }
-
-        private boolean updatePaths(Path repo) throws IOException {
-            if (repo == null) {
-                repo = Repository.getRepository().getRepositoryPath().toPath();
-            }
-            boolean foundNew = false;
-            List<Path> added = new ArrayList<>();
-            Set<Path> current = new HashSet<>();
-            if (!Files.exists(repo)) {
-                // Repository removed, probably due to shutdown
-                return true;
-            }
-            try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
-                for (Path p : dirStream) {
-                    if (!pathLookup.containsKey(p)) {
-                        added.add(p);
-                        Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
-                    }
-                    current.add(p);
-                }
-            }
-            List<Path> removed = new ArrayList<>();
-            for (Path p : pathLookup.keySet()) {
-                if (!current.contains(p)) {
-                    removed.add(p);
-                }
-            }
-
-            for (Path remove : removed) {
-                Long time = pathLookup.get(remove);
-                pathSet.remove(time);
-                pathLookup.remove(remove);
-            }
-            Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
-            for (Path p : added) {
-                // Only add files that have a complete header
-                // as the JVM may be in progress writing the file
-                long size = Files.size(p);
-                if (size >= ChunkHeader.HEADER_SIZE) {
-                    long startNanos = readStartTime(p);
-                    pathSet.put(startNanos, p);
-                    pathLookup.put(p, startNanos);
-                    foundNew = true;
-                }
-            }
-            return foundNew;
-        }
-
-        private long readStartTime(Path p) throws IOException {
-            try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
-                ChunkHeader c = new ChunkHeader(in);
-                return c.getStartNanos();
-            }
-        }
-
-        public void close() {
-            synchronized (pathSet) {
-                this.closed = true;
-                pathSet.notify();
-            }
-        }
-    }
-
     static final class ParserConsumer extends EventConsumer {
 
         private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime);
@@ -260,92 +139,12 @@
             return awaitNewEvents;
         }
 
-        public void setReuse(boolean reuse) {
-            this.reuse = reuse;
-        }
-
-        public void setOrdered(boolean ordered) {
-            this.ordered = ordered;
-        }
-
         @Override
         public void close() {
             repositoryFiles.close();
         }
     }
 
-    static final class SharedParserConsumer extends EventConsumer {
-        private EventSetLocation location;
-        private EventSet eventSet;
-        private int eventSetIndex;
-        private int eventArrayIndex;
-        private RecordedEvent[] currentEventArray = new RecordedEvent[0];
-
-        public SharedParserConsumer(AccessControlContext acc) throws IOException {
-            super(acc);
-        }
-
-        public void process() throws IOException {
-            this.location = EventSetLocation.current();
-            this.eventSet = location.acquire(startNanos, null); // use timestamp
-                                                                // from
-            if (eventSet == null) {
-                return;
-            }
-            while (!isClosed()) {
-                processSegment();
-                runFlushActions();
-                do {
-                    if (isClosed()) {
-                        return;
-                    }
-                    currentEventArray = eventSet.readEvents(eventSetIndex);
-                    if (currentEventArray == EventSet.END_OF_SET) {
-                        eventSet = eventSet.next(eventFilter);
-                        if (eventSet == null || isClosed()) {
-                            return;
-                        }
-                        eventSetIndex = 0;
-                        continue;
-                    }
-                    if (currentEventArray == null) {
-                        return; // no more events
-                    }
-                    eventSetIndex++;
-                } while (currentEventArray.length == 0);
-                eventArrayIndex = 0;
-            }
-        }
-
-        private void processSegment() {
-            while (eventArrayIndex < currentEventArray.length) {
-                RecordedEvent e = currentEventArray[eventArrayIndex++];
-                if (e == null) {
-                    return;
-                }
-                dispatch(e);
-            }
-        }
-
-        public void close() {
-            setClosed(true);
-            // TODO: Data races here, must fix
-            synchronized (this) {
-                if (eventSet != null) {
-                    eventSet.release(null);
-                }
-                if (location != null) {
-                    location.release();
-                }
-            }
-            runCloseActions();
-        }
-
-        public void setReuse(boolean reuse) {
-            // ignore hint
-        }
-    }
-
     private final EventConsumer eventConsumer;
 
     public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu Jun 27 10:19:32 2019 +0200
@@ -82,7 +82,6 @@
             }
         }
 
-
         private void processOrdered() throws IOException {
             if (sortedList == null) {
                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -188,6 +187,7 @@
         eventConsumer.start(0);
     }
 
+    @Override
     public void setReuse(boolean reuse) {
         eventConsumer.setReuse(reuse);
     }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java	Wed Jun 26 16:04:47 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,241 +0,0 @@
-/*
- * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.jfr.consumer;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
-import jdk.jfr.internal.consumer.ChunkHeader;
-import jdk.jfr.internal.consumer.InternalEventFilter;
-import jdk.jfr.internal.consumer.RecordingInput;
-
-/**
- * Cache that represents all discovered events in a chunk.
- *
- */
-final class EventSet {
-
-    public static final RecordedEvent[] END_OF_SET = new RecordedEvent[0];
-    private static final AtomicInteger idCounter = new AtomicInteger(-1);
-
-    private volatile Object[][] segments = new Object[1000][];
-    private volatile boolean closed;
-    private final long startTimeNanos;
-    private final EventSetLocation location;
-    private final Path path;
-    private final int id;
-
-    // Guarded by lock
-    private boolean awaitNewEvents;
-    private RecordingInput input;
-    private ChunkParser chunkParser;
-    private int referenceCount;
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Set<InternalEventFilter> filters = new HashSet<>();
-    private InternalEventFilter globalFilter = InternalEventFilter.ACCEPT_ALL;
-    private boolean dirtyFilter = true;
-
-    public void release(InternalEventFilter eventFilter) {
-        try {
-            lock.lock();
-            filters.remove(eventFilter);
-            updateGlobalFilter();
-            referenceCount--;
-            if (referenceCount == 0) {
-                closed = true;
-                if (input != null) {
-                    try {
-                        input.close();
-                    } catch (IOException e) {
-                        // TODO: Flie locked by other process?
-                    }
-                    chunkParser = null;
-                    input = null;
-                }
-            }
-        } finally {
-           lock.unlock();
-        }
-    }
-
-    public EventSet(EventSetLocation location, EventSet previousEventSet, Path p) throws IOException {
-        this.location = location;
-        this.path = p;
-        this.startTimeNanos = readStartTime(p);
-        this.id = idCounter.incrementAndGet();
-    }
-
-    private long readStartTime(Path p) throws IOException {
-        try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
-            ChunkHeader c = new ChunkHeader(in);
-            return c.getStartNanos();
-        }
-    }
-
-    Path getPath() {
-        return path;
-    }
-
-    // TODO: Use binary search, must use lock
-    public int findIndex(Instant timestamp) {
-        int index = 0;
-        for (int i = 0; i < segments.length; i++) {
-            RecordedEvent[] events = (RecordedEvent[]) segments[i];
-            if (events == null || events.length == 0) {
-                return Math.max(index - 1, 0);
-            }
-            RecordedEvent e = events[0]; // May not be sorted.
-            if (timestamp.isAfter(e.getEndTime())) {
-                return Math.max(index - 1, 0);
-            }
-        }
-        return segments.length;
-    }
-
-    public void addFilter(InternalEventFilter filter) {
-        try {
-            lock.lock();
-            filters.add(filter);
-            updateGlobalFilter();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // held with lock
-    private void updateGlobalFilter() {
-        globalFilter = InternalEventFilter.merge(filters);
-        dirtyFilter = true;
-    }
-
-    public RecordedEvent[] readEvents(int index) throws IOException {
-        while (!closed) {
-
-            RecordedEvent[] events = (RecordedEvent[]) segments[index];
-            if (events != null) {
-                return events;
-            }
-            if (await()) {
-                try {
-                    addSegment(index);
-                } finally {
-                    lock.unlock();
-                }
-            }
-        }
-        return null;
-    }
-
-    private boolean await()  {
-        try {
-            return lock.tryLock(250, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            return false;
-        }
-    }
-
-    // held with lock
-    private void addSegment(int index) throws IOException {
-        if (chunkParser == null) {
-            chunkParser = new ChunkParser(new RecordingInput(path.toFile()), false);
-        }
-        if (dirtyFilter) {
-            chunkParser.setParserFilter(globalFilter);
-        }
-        if (segments[index] != null) {
-            return;
-        }
-        if (index == segments.length - 2) {
-            segments = Arrays.copyOf(segments, segments.length * 2);
-        }
-        RecordedEvent[] segment = new RecordedEvent[10];
-        int i = 0;
-        while (true) {
-            RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
-            if (e == null) {
-                // wait for new event with next call to readStreamingEvent()
-                awaitNewEvents = true;
-                break;
-            }
-            awaitNewEvents = false;
-            if (i == segment.length) {
-                segment = Arrays.copyOf(segment, segment.length * 2);
-            }
-            segment[i++] = e;
-        }
-
-        // no events found
-        if (i == 0) {
-            if (chunkParser.isChunkFinished()) {
-                segments[index] = END_OF_SET;
-                return;
-            }
-        }
-        // at least 2 events, sort them
-        if (i > 1) {
-            Arrays.sort(segment, 0, i, (e1, e2) -> Long.compare(e1.endTime, e2.endTime));
-        }
-        segments[index] = segment;
-        if (chunkParser.isChunkFinished()) {
-            segments[index + 1] = END_OF_SET;
-        }
-    }
-
-    public long getStartTimeNanos() {
-        return startTimeNanos;
-    }
-
-    public EventSet next(InternalEventFilter filter) throws IOException {
-        EventSet next = location.acquire(startTimeNanos + 1, this);
-        if (next == null) {
-            // closed
-            return null;
-        }
-        next.addFilter(filter);
-        release(filter);
-        return next;
-    }
-
-    public void acquire() {
-        try {
-            lock.lock();
-            referenceCount++;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public String toString() {
-        return "Chunk:" + id + " (" + path + ")";
-    }
-}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSetLocation.java	Wed Jun 26 16:04:47 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,183 +0,0 @@
-/*
- * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.jfr.consumer;
-
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.Repository;
-import jdk.jfr.internal.consumer.ChunkHeader;
-
-/**
- * This class corresponds to a disk repository.
- * <p>
- * Main purpose is to act as a cache if multiple {@code EventStream} want to
- * access the same repository. An {@code EventSetLocation} should be released
- * when it is no longer being used.
- *
- */
-final class EventSetLocation {
-    private static Map<Path, EventSetLocation> locations = new HashMap<>();
-
-    private final SortedMap<Long, EventSet> eventSets = new TreeMap<>();
-    private final Map<Path, Long> lastPaths = new HashMap<>();
-
-    final Path path;
-    private int count = 0;
-    private volatile boolean closed;
-
-    private EventSetLocation(Path path) {
-        this.path = path;
-    }
-
-    public static EventSetLocation get(Path absolutPath) {
-        synchronized (locations) {
-            EventSetLocation esl = locations.get(absolutPath);
-            if (esl == null) {
-                esl = new EventSetLocation(absolutPath);
-                locations.put(absolutPath, esl);
-            }
-            esl.count++;
-            return esl;
-        }
-    }
-
-    public static EventSetLocation current() throws IOException {
-        Repository.getRepository().ensureRepository();
-        return get(Repository.getRepository().getRepositoryPath().toPath());
-    }
-
-    public void release() {
-        synchronized (locations) {
-            count--;
-            if (count == 0) {
-                locations.remove(path);
-                closed = true;
-            }
-        }
-    }
-
-    public synchronized EventSet acquire(long startTimeNanos, EventSet previousEventSet) {
-        synchronized (eventSets) {
-            while (!closed) {
-                SortedMap<Long, EventSet> after = eventSets.tailMap(startTimeNanos);
-                if (!after.isEmpty()) {
-                    EventSet es =  after.get(after.firstKey());
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Acquired " + startTimeNanos + ", got " + es);
-                    es.acquire();
-                    return es;
-                }
-                try {
-                    if (updateEventSets(previousEventSet)) {
-                        continue;
-                    }
-                } catch (IOException e) {
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during event set update " + e.getMessage());
-                    // This can happen if a chunk is being removed
-                    // between the file was discovered and an instance
-                    // of an EventSet was constructed. Just ignore,
-                    // and retry later.
-                }
-                try {
-                    eventSets.wait(1000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-        }
-        return null;
-    }
-
-    private boolean updateEventSets(EventSet previousEventSet) throws IOException {
-        boolean foundNew = false;
-        List<Path> added = new ArrayList<>();
-        Set<Path> current = new HashSet<>();
-        if (!Files.exists(path)) {
-            // Repository removed, probably due to shutdown
-            closed = true;
-            return true;
-        }
-        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(path, "*.jfr")) {
-            for (Path p : dirStream) {
-                if (!lastPaths.containsKey(p)) {
-                    added.add(p);
-                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
-                }
-                current.add(p);
-            }
-        }
-        List<Path> removed = new ArrayList<>();
-        for (Path p : lastPaths.keySet()) {
-            if (!current.contains(p)) {
-                removed.add(p);
-            }
-        }
-
-        for (Path remove : removed) {
-            Long time = lastPaths.get(remove);
-            eventSets.remove(time);
-            lastPaths.remove(remove);
-        }
-        Collections.sort(added, (p1,p2) -> p1.compareTo(p2));
-        for (Path p : added) {
-            // Only add files that have a complete header
-            // as the JVM may be in progress writing the file
-            long size = Files.size(p);
-            if (size >= ChunkHeader.HEADER_SIZE) {
-                EventSet es = new EventSet(this, previousEventSet, p);
-                long startTime = es.getStartTimeNanos();
-                if (startTime == 0) {
-                    String errorMsg = "Chunk header should always contain a valid start time";
-                    throw new InternalError(errorMsg);
-                }
-                EventSet previous = eventSets.get(startTime);
-                if (previous != null) {
-                    String errorMsg = "Found chunk " + p + " with the same start time " + startTime + " as previous chunk " + previous.getPath();
-                    throw new InternalError(errorMsg);
-                }
-                eventSets.put(startTime, es);
-                lastPaths.put(p, startTime);
-                previousEventSet = es;
-                foundNew = true;
-            }
-        }
-        return foundNew;
-    }
-}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java	Thu Jun 27 10:19:32 2019 +0200
@@ -110,7 +110,7 @@
 
     protected boolean reuse = true;
 
-    protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+    protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
 
     private final AccessControlContext accessControlContext;
     private boolean started;
@@ -184,6 +184,7 @@
         }
         if (removeConsumer) {
             EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
+            eventFilter = buildFilter(array);
             consumersHandle.setVolatile(this, array);
             dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
                                                                  // dispatch
@@ -221,8 +222,22 @@
         add(new EventDispatcher(eventName, action));
     }
 
+    InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+        InternalEventFilter ef = new InternalEventFilter();
+        for (EventDispatcher ed : dispatchers) {
+            String name = ed.eventName;
+            if (name == null) {
+                return InternalEventFilter.ACCEPT_ALL;
+            }
+            ef.setThreshold(name, 0);
+        }
+        return ef.threadSafe();
+    }
+
     private synchronized void add(EventDispatcher e) {
-        consumersHandle.setVolatile(this, merge(consumers, e));
+        EventDispatcher[] dispatchers = merge(consumers,e);
+        eventFilter = buildFilter(dispatchers);
+        consumersHandle.setVolatile(this, dispatchers);
         dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
     }
 
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java	Wed Jun 26 16:04:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java	Thu Jun 27 10:19:32 2019 +0200
@@ -25,49 +25,29 @@
 
 package jdk.jfr.internal.consumer;
 
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
+import java.util.StringJoiner;
 
 public final class InternalEventFilter {
-    public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter();
-    private final Map<String, Long> thresholds = new HashMap<>();
-    private boolean acceptAll;
-
-    public static InternalEventFilter merge(Collection<InternalEventFilter> filters) {
-        for (InternalEventFilter ef : filters) {
-            if (ef.getAcceptAll()) {
-                return ACCEPT_ALL;
-            }
-        }
-        if (filters.size() == 1) {
-            return filters.iterator().next();
-        }
+    public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter(true, Map.of());
 
-        Set<String> eventNames = new HashSet<>();
-        for (InternalEventFilter ef : filters) {
-            eventNames.addAll(ef.thresholds.keySet());
-        }
-        InternalEventFilter result = new InternalEventFilter();
-        for (String eventName : eventNames) {
-            for (InternalEventFilter ef : filters) {
-                Long l = ef.thresholds.get(eventName);
-                if (l != null) {
-                    result.setThreshold(eventName, l.longValue());
-                }
-            }
-        }
-        return result;
+    private final Map<String, Long> thresholds;
+    private final boolean acceptAll;
+
+    public InternalEventFilter() {
+        this(false, new HashMap<>());
     }
 
-    private boolean getAcceptAll() {
-        return acceptAll;
+    // returns an instance that can be passed to
+    // another thread safely
+    public InternalEventFilter threadSafe() {
+        return new InternalEventFilter(acceptAll, thresholds);
     }
 
-    public void setAcceptAll() {
-        acceptAll = true;
+    private InternalEventFilter(boolean acceptAll, Map<String, Long> thresholds) {
+        this.acceptAll = acceptAll;
+        this.thresholds = thresholds;
     }
 
     public void setThreshold(String eventName, long nanos) {
@@ -82,7 +62,7 @@
 
     public long getThreshold(String eventName) {
         if (acceptAll) {
-            return 0;
+            return 0L;
         }
         Long l = thresholds.get(eventName);
         if (l != null) {
@@ -90,17 +70,16 @@
         }
         return -1;
     }
+
     public String toString() {
         if (acceptAll) {
             return "ACCEPT ALL";
         }
-        StringBuilder sb = new StringBuilder();
+
+        StringJoiner sb = new StringJoiner(", ");
         for (String key : thresholds.keySet().toArray(new String[0])) {
             Long value = thresholds.get(key);
-            sb.append(key);
-            sb.append(" = ");
-            sb.append(value.longValue() / 1_000_000);
-            sb.append(" ms");
+            sb.add(key + " = " + value);
         }
         return sb.toString();
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Thu Jun 27 10:19:32 2019 +0200
@@ -0,0 +1,126 @@
+package jdk.jfr.internal.consumer;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.Repository;
+
+public final class RepositoryFiles {
+    private final Path repostory;
+    private final SortedMap<Long, Path> pathSet = new TreeMap<>();
+    private final Map<Path, Long> pathLookup = new HashMap<>();
+    private volatile boolean closed;
+
+    public RepositoryFiles(Path repostory) {
+        this.repostory = repostory;
+    }
+
+    public long getTimestamp(Path p) {
+        return pathLookup.get(p);
+    }
+
+    public Path nextPath(long startTimeNanos) {
+        while (!closed) {
+            SortedMap<Long, Path> after = pathSet.tailMap(startTimeNanos);
+            if (!after.isEmpty()) {
+                Path path = after.get(after.firstKey());
+                Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.TRACE, "Return path " + path + " for start time nanos " + startTimeNanos);
+                return path;
+            }
+            try {
+                if (updatePaths(repostory)) {
+                    continue;
+                }
+            } catch (IOException e) {
+                Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "IOException during repository file scan " + e.getMessage());
+                // This can happen if a chunk is being removed
+                // between the file was discovered and an instance
+                // of an EventSet was constructed. Just ignore,
+                // and retry later.
+            }
+            try {
+                synchronized (pathSet) {
+                    pathSet.wait(1000);
+                }
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return null;
+    }
+
+    private boolean updatePaths(Path repo) throws IOException {
+        if (repo == null) {
+            repo = Repository.getRepository().getRepositoryPath().toPath();
+        }
+        boolean foundNew = false;
+        List<Path> added = new ArrayList<>();
+        Set<Path> current = new HashSet<>();
+        if (!Files.exists(repo)) {
+            // Repository removed, probably due to shutdown
+            return true;
+        }
+        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
+            for (Path p : dirStream) {
+                if (!pathLookup.containsKey(p)) {
+                    added.add(p);
+                    Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+                }
+                current.add(p);
+            }
+        }
+        List<Path> removed = new ArrayList<>();
+        for (Path p : pathLookup.keySet()) {
+            if (!current.contains(p)) {
+                removed.add(p);
+            }
+        }
+
+        for (Path remove : removed) {
+            Long time = pathLookup.get(remove);
+            pathSet.remove(time);
+            pathLookup.remove(remove);
+        }
+        Collections.sort(added, (p1, p2) -> p1.compareTo(p2));
+        for (Path p : added) {
+            // Only add files that have a complete header
+            // as the JVM may be in progress writing the file
+            long size = Files.size(p);
+            if (size >= ChunkHeader.HEADER_SIZE) {
+                long startNanos = readStartTime(p);
+                pathSet.put(startNanos, p);
+                pathLookup.put(p, startNanos);
+                foundNew = true;
+            }
+        }
+        return foundNew;
+    }
+
+    private long readStartTime(Path p) throws IOException {
+        try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+            ChunkHeader c = new ChunkHeader(in);
+            return c.getStartNanos();
+        }
+    }
+
+    public void close() {
+        synchronized (pathSet) {
+            this.closed = true;
+            pathSet.notify();
+        }
+    }
+}
\ No newline at end of file