Add EventStream::setEndTime(...) and a first stab at priviliged access to local repository JEP-349-branch
authoregahlin
Fri, 09 Aug 2019 01:18:18 +0200
branchJEP-349-branch
changeset 57690 9316d02dd4a5
parent 57641 5fb8ececb9e6
child 57702 c75c241c492a
Add EventStream::setEndTime(...) and a first stab at priviliged access to local repository
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/FileAccess.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java
src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java
src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Disassemble.java
src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Summary.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetEndTime.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetStartTime.java
test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Aug 09 01:18:18 2019 +0200
@@ -45,6 +45,7 @@
 import jdk.jfr.internal.LogTag;
 import jdk.jfr.internal.Logger;
 import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.InternalEventFilter;
 
 /*
@@ -71,10 +72,11 @@
         private boolean reuse = true;
         private boolean ordered = true;
         private Instant startTime = null;
+        private Instant endTime = null;
         private boolean started = false;
         private long startNanos = 0;
+        private long endNanos = Long.MAX_VALUE;
         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
         private boolean changed = false;
 
         public StreamConfiguration(StreamConfiguration configuration) {
@@ -86,8 +88,10 @@
             this.reuse = configuration.reuse;
             this.ordered = configuration.ordered;
             this.startTime = configuration.startTime;
+            this.endTime = configuration.endTime;
             this.started = configuration.started;
             this.startNanos = configuration.startNanos;
+            this.endNanos = configuration.endNanos;
             this.dispatcherLookup = configuration.dispatcherLookup;
         }
 
@@ -197,9 +201,16 @@
             changed = true;
             return this;
         }
+        public StreamConfiguration setEndTime(Instant endTime) {
+            this.endTime = endTime;
+            this.endNanos = Utils.timeToNanos(endTime);
+            changed = true;
+            return this;
+        }
 
         final public StreamConfiguration setStartTime(Instant startTime) {
             this.startTime = startTime;
+            this.startNanos = Utils.timeToNanos(startTime);
             changed = true;
             return this;
         }
@@ -208,6 +219,10 @@
             return startTime;
         }
 
+        public Object getEndTime() {
+            return endTime;
+        }
+
         final public boolean isStarted() {
             return started;
         }
@@ -243,6 +258,10 @@
             return startNanos;
         }
 
+        final public long getEndNanos() {
+            return endNanos;
+        }
+
         final public InternalEventFilter getFilter() {
             return eventFilter;
         }
@@ -264,12 +283,18 @@
             sb.append("Started: ").append(started).append("\n");
             sb.append("Start Time: ").append(startTime).append("\n");
             sb.append("Start Nanos: ").append(startNanos).append("\n");
+            sb.append("End Time: ").append(endTime).append("\n");
+            sb.append("End Nanos: ").append(endNanos).append("\n");
             return sb.toString();
         }
 
         private EventDispatcher[] getDispatchers() {
             return dispatchers;
         }
+
+
+
+
     }
 
     final static class EventDispatcher {
@@ -296,22 +321,22 @@
         }
     }
 
-    public final static Instant NEXT_EVENT = Instant.now();
     public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
 
     private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
     private final AccessControlContext accessControlContext;
     private final Thread thread;
-
-    // Update bu updateConfiguration()
+    private final boolean active;
+    // Update by updateConfiguration()
     protected StreamConfiguration configuration = new StreamConfiguration();
 
     // Cache the last event type and dispatch.
     private EventType lastEventType;
     private EventDispatcher[] lastEventDispatch;
 
-    public AbstractEventStream(AccessControlContext acc) throws IOException {
+    public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
         this.accessControlContext = acc;
+        this.active = active;
         // Create thread object in constructor to ensure caller has
         // permission before constructing object
         thread = new Thread(this);
@@ -460,16 +485,21 @@
         if (configuration.isStarted()) {
             throw new IllegalStateException("Stream is already started");
         }
-        if (startTime == null) {
-            return;
-        }
         if (startTime.isBefore(Instant.EPOCH)) {
             startTime = Instant.EPOCH;
         }
         updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
     }
 
-    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+    public final void setEndTime(Instant endTime) {
+    if (configuration.isStarted()) {
+        throw new IllegalStateException("Stream is already started");
+    }
+    updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+}
+
+
+    protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
         // Changes to the configuration must be serialized, so make
         // sure that we have the monitor
         Thread.holdsLock(this);
@@ -490,32 +520,27 @@
     }
 
     public final void startAsync(long startNanos) {
+        startInternal(startNanos);
+        thread.start();
+    }
+
+    public final void start(long startNanos) {
+        startInternal(startNanos);
+        run();
+    }
+
+    private void startInternal(long startNanos) {
         synchronized (this) {
             if (configuration.isStarted()) {
                 throw new IllegalStateException("Event stream can only be started once");
             }
             StreamConfiguration c = new StreamConfiguration(configuration);
-            c.setStartNanos(startNanos);
+            if (active) {
+                c.setStartNanos(startNanos);
+            }
             c.setStarted(true);
             updateConfiguration(c);
         }
-        thread.start();
-    }
-
-    public final void start(long startNanos) {
-        synchronized (this) {
-            if (configuration.isStarted()) {
-                throw new IllegalStateException("Event stream can only be started once");
-            }
-            StreamConfiguration c = new StreamConfiguration(configuration);
-            if (c.getStartTime() != null) {
-                startNanos= c.getStartTime().toEpochMilli() * 1_000_000L;
-            }
-            c.setStartNanos(startNanos);
-            c.setStarted(true);
-            updateConfiguration(c);
-        }
-        run();
     }
 
     public final void awaitTermination(Duration timeout) {
@@ -534,4 +559,5 @@
     }
 
     abstract public void close();
+
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri Aug 09 01:18:18 2019 +0200
@@ -64,7 +64,8 @@
     private boolean reuse;
     private boolean ordered;
     private boolean resetEventCache;
-    private long firstNanos;
+    private long firstNanos = 0;
+    private long lastNanos = Long.MAX_VALUE;
 
     public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
        this(new ChunkHeader(input), null, 1000);
@@ -118,7 +119,6 @@
         return this.eventFilter;
     }
 
-
     /**
      * Reads an event and returns null when segment or chunk ends.
      *
@@ -373,6 +373,10 @@
         }
         this.firstNanos = firstNanos;
     }
+    public void setLastNanos(long lastNanos) {
+        this.lastNanos = lastNanos;
+    }
+
 
     // Need to call updateEventParsers() for
     // change to take effect
@@ -388,6 +392,7 @@
                 ep.setOrdered(ordered);
                 ep.setReuse(reuse);
                 ep.setFirstNanos(firstNanos);
+                ep.setLastNanos(lastNanos);
                 if (resetEventCache) {
                     ep.resetCache();
                 }
@@ -407,4 +412,10 @@
     public long getChunkDuration() {
         return chunkHeader.getDurationNanos();
     }
+
+    public long getStartNanos() {
+        return chunkHeader.getStartNanos();
+    }
+
+
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Aug 09 01:18:18 2019 +0200
@@ -36,6 +36,8 @@
 import java.util.function.Consumer;
 
 import jdk.jfr.internal.SecuritySupport.SafePath;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
 import jdk.jfr.internal.consumer.RepositoryFiles;
 
@@ -44,7 +46,7 @@
  * with chunk files.
  *
  */
-final class EventDirectoryStream implements EventStream {
+class EventDirectoryStream implements EventStream {
 
     static final class DirectoryStream extends AbstractEventStream {
 
@@ -52,34 +54,38 @@
         private static final int DEFAULT_ARRAY_SIZE = 10_000;
 
         private final RepositoryFiles repositoryFiles;
-
+        private final boolean active;
+        private final FileAccess fileAccess;
         private ChunkParser chunkParser;
         private RecordedEvent[] sortedList;
         protected long chunkStartNanos;
 
-        public DirectoryStream(AccessControlContext acc, Path p) throws IOException {
-            super(acc);
-            repositoryFiles = new RepositoryFiles(p == null ? null : new SafePath(p));
+        public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+            super(acc, active);
+            this.fileAccess = fileAccess;
+            this.active = active;
+            repositoryFiles = new RepositoryFiles(fileAccess, p == null ? null : new SafePath(p));
         }
 
         @Override
         public void process() throws IOException {
-            StreamConfiguration c1 = configuration;
-            chunkStartNanos = c1.getStartNanos();
+            final StreamConfiguration c1 = configuration;
             Path path;
-            if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
-                // TODO: Need to wait for next segment to arrive and then
-                // use first event, but this will do for.
+            boolean validStartTime = active || c1.getStartTime() != null;
+            if (validStartTime) {
+                path = repositoryFiles.firstPath(c1.getStartNanos());
+            } else {
                 path = repositoryFiles.lastPath();
-            } else {
-                path = repositoryFiles.firstPath(chunkStartNanos);
             }
             if (path == null) { // closed
                 return;
             }
             chunkStartNanos = repositoryFiles.getTimestamp(path);
-            try (RecordingInput input = new RecordingInput(path.toFile())) {
+            try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
                 chunkParser = new ChunkParser(input, c1.getReuse());
+                long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
+                long start = validStartTime ? c1.getStartNanos() : segmentStart;
+                long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE;
                 while (!isClosed()) {
                     boolean awaitnewEvent = false;
                     while (!isClosed() && !chunkParser.isChunkFinished()) {
@@ -87,7 +93,8 @@
                         boolean ordered = c2.getOrdered();
                         chunkParser.setReuse(c2.getReuse());
                         chunkParser.setOrdered(ordered);
-                        chunkParser.setFirstNanos(c2.getStartNanos());
+                        chunkParser.setFirstNanos(start);
+                        chunkParser.setLastNanos(end);
                         chunkParser.resetEventCache();
                         chunkParser.setParserFilter(c2.getFilter());
                         chunkParser.updateEventParsers();
@@ -98,7 +105,13 @@
                             awaitnewEvent = processUnordered(awaitnewEvent);
                         }
                         runFlushActions();
+                        if (segmentStart > end) {
+                            close();
+                            return;
+                        }
                     }
+
+
                     if (isClosed()) {
                         return;
                     }
@@ -110,11 +123,12 @@
                     chunkStartNanos = repositoryFiles.getTimestamp(path);
                     input.setFile(path);
                     chunkParser = chunkParser.newChunkParser();
+                    // No need filter when we reach new chunk
+                    // start = 0;
                 }
             }
         }
 
-
         private boolean processOrdered(boolean awaitNewEvents) throws IOException {
             if (sortedList == null) {
                 sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -169,9 +183,8 @@
 
     private final AbstractEventStream eventStream;
 
-    public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
-        eventStream = new DirectoryStream(acc, p);
-        eventStream.setStartTime(startTime);
+    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException {
+        eventStream = new DirectoryStream(acc, p, access, active);
     }
 
     @Override
@@ -187,12 +200,12 @@
 
     @Override
     public void start() {
-        start(Instant.now().toEpochMilli() * 1000 * 1000L);
+        start(Utils.timeToNanos(Instant.now()));
     }
 
     @Override
     public void startAsync() {
-        startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
+        startAsync(Utils.timeToNanos(Instant.now()));
     }
 
     @Override
@@ -246,6 +259,12 @@
         eventStream.setStartTime(startTime);
     }
 
+    @Override
+    public void setEndTime(Instant endTime) {
+        eventStream.setEndTime(endTime);
+    }
+
+
     public void start(long startNanos) {
         eventStream.start(startNanos);
     }
@@ -253,4 +272,6 @@
     public void startAsync(long startNanos) {
         eventStream.startAsync(startNanos);
     }
+
+
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Aug 09 01:18:18 2019 +0200
@@ -35,6 +35,7 @@
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
 
 /**
@@ -52,17 +53,32 @@
         private RecordedEvent[] sortedList;
 
         public FileStream(AccessControlContext acc, Path path) throws IOException {
-            super(acc);
-            this.input = new RecordingInput(path.toFile());
+            super(acc, false);
+            this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
 ;        }
 
         @Override
         public void process() throws IOException {
-            StreamConfiguration c1 = configuration;
+            final StreamConfiguration c1 = configuration;
+            long start = 0;
+            long end = Long.MAX_VALUE;
+            if (c1.getStartTime() != null) {
+                start = c1.getStartNanos();
+            }
+            if (c1.getEndTime() != null) {
+                end = c1.getEndNanos();
+            }
+
             chunkParser = new ChunkParser(input, c1.getReuse());
             while (!isClosed()) {
+                if (chunkParser.getStartNanos() > end) {
+                    close();
+                    return;
+                }
                 StreamConfiguration c2 = configuration;
                 boolean ordered = c2.getOrdered();
+                chunkParser.setFirstNanos(start);
+                chunkParser.setLastNanos(end);
                 chunkParser.setReuse(c2.getReuse());
                 chunkParser.setOrdered(ordered);
                 chunkParser.resetEventCache();
@@ -207,4 +223,9 @@
     public void setStartTime(Instant startTime) {
         eventStream.setStartTime(startTime);
     }
+
+    @Override
+    public void setEndTime(Instant endTime) {
+        eventStream.setEndTime(endTime);
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Fri Aug 09 01:18:18 2019 +0200
@@ -55,6 +55,7 @@
     private int index;
     private boolean ordered;
     private long firstNanos;
+    private long lastNanos = Long.MAX_VALUE;
     private long thresholdNanos = -1;
 
     EventParser(TimeConverter timeConverter, EventType type, Parser[] parsers) {
@@ -120,8 +121,12 @@
             }
             endTicks += durationTicks;
         }
-        if (firstNanos > 0L) {
-            if (timeConverter.convertTimestamp(endTicks) < firstNanos) {
+        if (firstNanos != 0L || lastNanos != Long.MAX_VALUE) {
+            long eventEnd = timeConverter.convertTimestamp(endTicks);
+            if (eventEnd < firstNanos) {
+                return null;
+            }
+            if (eventEnd > lastNanos) {
                 return null;
             }
         }
@@ -173,6 +178,10 @@
         this.firstNanos = firstNanos;
     }
 
+    public void setLastNanos(long lastNanos) {
+        this.lastNanos = lastNanos;
+    }
+
     public void setOrdered(boolean ordered) {
         if (this.ordered == ordered) {
             return;
@@ -180,4 +189,5 @@
         this.ordered = ordered;
         this.index = 0;
     }
+
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Fri Aug 09 01:18:18 2019 +0200
@@ -27,15 +27,18 @@
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Objects;
 import java.util.function.Consumer;
 
-import jdk.jfr.internal.Repository;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
 
 /**
- * Represents a stream of event that actions can be performed up on.
+ * Represents a stream of events.
  */
 public interface EventStream extends AutoCloseable {
 
@@ -50,16 +53,18 @@
      *
      * @throws IOException if a stream can't be opened, or an I/O error occurs
      *         when trying to access the repository
+     *
+     * @throws SecurityException if a security manager exists and the caller
+     *         does not have
+     *         {@code FlightRecorderPermission("accessFlightRecorder")}
      */
     public static EventStream openRepository() throws IOException {
-        Repository r = Repository.getRepository();
-        r.ensureRepository();
-        Path path = r.getRepositoryPath().toPath();
-        return new EventDirectoryStream(AccessController.getContext(), path, AbstractEventStream.NEXT_EVENT);
+        Utils.checkAccessFlightRecorder();
+        return new EventDirectoryStream(AccessController.getContext(), null, FileAccess.PRIVILIGED, false);
     }
 
     /**
-     * Creates a stream from a disk repository.
+     * Creates an event stream from a disk repository.
      * <p>
      * By default, the stream starts with the next event flushed by Flight
      * Recorder.
@@ -68,11 +73,17 @@
      *
      * @return an event stream, not {@code null}
      *
-      * @throws IOException if a stream can't be opened, or an I/O error occurs
+     * @throws IOException if a stream can't be opened, or an I/O error occurs
      *         when trying to access the repository
+     *
+     * @throws SecurityException if a security manager exists and its
+     *         {@code checkRead} method denies read access to the directory, or
+     *         files in the directory.
      */
     public static EventStream openRepository(Path directory) throws IOException {
-        return new EventDirectoryStream(AccessController.getContext(), directory, AbstractEventStream.NEXT_EVENT);
+        Objects.nonNull(directory);
+        AccessControlContext acc = AccessController.getContext();
+        return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILIGED, false);
     }
 
     /**
@@ -86,6 +97,9 @@
      *
      * @throws IOException if a stream can't be opened, or an I/O error occurs
      *         during reading
+     *
+     * @throws SecurityException if a security manager exists and its
+     *         {@code checkRead} method denies read access to the file
      */
     public static EventStream openFile(Path file) throws IOException {
         return new EventFileStream(file, null, null);
@@ -174,6 +188,8 @@
 
     /**
      * Specifies start time of the event stream.
+     * <p>
+     * The start time must be set before the stream is started.
      *
      * @param startTime the start time, not {@code null}
      *
@@ -182,6 +198,20 @@
     public void setStartTime(Instant startTime);
 
     /**
+     * Specifies end time of the event stream.
+     * <p>
+     * The end time must be set before the stream is started.
+     * <p>
+     * When the end time is reached the stream is terminated.
+     *
+     * @param endTime the end time, not {@code null}
+     *
+     * @throws IllegalStateException if the stream has already been started
+     */
+    public void setEndTime(Instant endTime);
+
+
+    /**
      * Start processing events in the stream.
      * <p>
      * All actions performed on this stream will happen in the current thread.
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java	Fri Aug 09 01:18:18 2019 +0200
@@ -39,6 +39,7 @@
 import jdk.jfr.internal.MetadataDescriptor;
 import jdk.jfr.internal.Type;
 import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
 
 /**
@@ -81,7 +82,7 @@
      */
     public RecordingFile(Path file) throws IOException {
         this.file = file.toFile();
-        this.input = new RecordingInput(this.file);
+        this.input = new RecordingInput(this.file, FileAccess.UNPRIVILIGED);
         findNext();
     }
 
@@ -134,7 +135,7 @@
         MetadataDescriptor previous = null;
         List<EventType> types = new ArrayList<>();
         HashSet<Long> foundIds = new HashSet<>();
-        try (RecordingInput ri = new RecordingInput(file)) {
+        try (RecordingInput ri = new RecordingInput(file, FileAccess.UNPRIVILIGED)) {
             ChunkHeader ch = new ChunkHeader(ri);
             aggregateEventTypeForChunk(ch, null, types, foundIds);
             while (!ch.isLastChunk()) {
@@ -150,7 +151,7 @@
         MetadataDescriptor previous = null;
         List<Type> types = new ArrayList<>();
         HashSet<Long> foundIds = new HashSet<>();
-        try (RecordingInput ri = new RecordingInput(file)) {
+        try (RecordingInput ri = new RecordingInput(file, FileAccess.UNPRIVILIGED)) {
             ChunkHeader ch = new ChunkHeader(ri);
             ch.awaitFinished();
             aggregateTypeForChunk(ch, null, types, foundIds);
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Fri Aug 09 01:18:18 2019 +0200
@@ -41,6 +41,7 @@
 import jdk.jfr.internal.PlatformRecording;
 import jdk.jfr.internal.PrivateAccess;
 import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
 
 /**
  * A recording stream produces events from the current JVM (Java Virtual
@@ -99,7 +100,7 @@
         this.recording = new Recording();
         this.recording.setFlushInterval(Duration.ofMillis(1000));
         try {
-            this.stream = new EventDirectoryStream(acc, null, null);
+            this.stream = new EventDirectoryStream(acc, null, FileAccess.PRIVILIGED, true);
         } catch (IOException ioe) {
             throw new IllegalStateException(ioe.getMessage());
         }
@@ -356,4 +357,9 @@
     public void setStartTime(Instant startTime) {
         stream.setStartTime(startTime);
     }
+
+    @Override
+    public void setEndTime(Instant endTime) {
+        stream.setStartTime(endTime);
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java	Fri Aug 09 01:18:18 2019 +0200
@@ -43,6 +43,7 @@
 import java.lang.reflect.Modifier;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -619,4 +620,8 @@
         }
 
     }
+
+    public static long timeToNanos(Instant timestamp) {
+        return timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano();
+    }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java	Fri Aug 09 01:18:18 2019 +0200
@@ -125,7 +125,7 @@
             byte fileState2 =  input.readPhysicalByte();
             if (fileState1 == fileState2) { // valid header
                 finished = fileState1 == 0;
-                if (constantPoolPosition != 0 && metadataPosition != 0) {
+                if (metadataPosition != 0) {
                     Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Setting input size to " + (absoluteChunkStart + chunkSize));
                     if (finished) {
                         // This assumes that the whole recording
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/FileAccess.java	Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,46 @@
+package jdk.jfr.internal.consumer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+// Protected by modular boundaries.
+public abstract class FileAccess {
+    public final static FileAccess PRIVILIGED = new UnPriviliged();
+    // TODO: Should be changed Priviliged class
+    public final static FileAccess UNPRIVILIGED = new UnPriviliged();
+
+    abstract RandomAccessFile openRAF(File f, String mode) throws FileNotFoundException;
+    abstract DirectoryStream<Path> newDirectoryStream(Path repository) throws IOException;
+
+    static class Priviliged extends FileAccess {
+        @Override
+        RandomAccessFile openRAF(File f, String mode) {
+            // TDOO: Implement
+            return null;
+        }
+
+        @Override
+        protected DirectoryStream<Path> newDirectoryStream(Path repository) {
+            // TDOO: Implement
+            return null;
+        }
+    }
+
+    static class UnPriviliged extends FileAccess {
+        @Override
+        RandomAccessFile openRAF(File f, String mode) throws FileNotFoundException {
+            return new RandomAccessFile(f, mode);
+        }
+
+        @Override
+        DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException {
+            return Files.newDirectoryStream(dir);
+        }
+
+    }
+}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java	Fri Aug 09 01:18:18 2019 +0200
@@ -64,24 +64,25 @@
             blockPositionEnd = 0;
         }
     }
-
+    private final int blockSize;
+    private final FileAccess fileAccess;
     private RandomAccessFile file;
     private String filename;
     private Block currentBlock = new Block();
     private Block previousBlock = new Block();
     private long position;
-    private final int blockSize;
     private long size = -1; // Fail fast if setSize(...) has not been called
                             // before parsing
 
-    public RecordingInput(File f, int blockSize) throws IOException {
+    public RecordingInput(File f, FileAccess fileAccess, int blockSize) throws IOException {
         this.blockSize = blockSize;
+        this.fileAccess = fileAccess;
         initialize(f);
     }
 
     private void initialize(File f) throws IOException {
         this.filename = f.getAbsolutePath().toString();
-        this.file = new RandomAccessFile(f, "r");
+        this.file = fileAccess.openRAF(f, "r");
         this.position = 0;
         this.size = -1;
         this.currentBlock.reset();
@@ -91,8 +92,8 @@
         }
     }
 
-    public RecordingInput(File f) throws IOException {
-        this(f, DEFAULT_BLOCK_SIZE);
+    public RecordingInput(File f, FileAccess fileAccess) throws IOException {
+        this(f, fileAccess, DEFAULT_BLOCK_SIZE);
     }
 
     public void positionPhysical(long position) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java	Fri Aug 09 01:18:18 2019 +0200
@@ -46,13 +46,15 @@
 import jdk.jfr.internal.SecuritySupport.SafePath;
 
 public final class RepositoryFiles {
-    private final Path repository;
+    private final FileAccess fileAccess;
     private final NavigableMap<Long, Path> pathSet = new TreeMap<>();
     private final Map<Path, Long> pathLookup = new HashMap<>();
     private volatile boolean closed;
+    private Path repository;
 
-    public RepositoryFiles(SafePath repository) {
+    public RepositoryFiles(FileAccess fileAccess, SafePath repository) {
         this.repository = repository == null ? null : repository.toPath();
+        this.fileAccess = fileAccess;
     }
 
     public long getTimestamp(Path p) {
@@ -63,7 +65,7 @@
         // Wait for chunks
         while (!closed) {
             try {
-                if (updatePaths(repository)) {
+                if (updatePaths()) {
                     break;
                 }
             } catch (IOException e) {
@@ -101,7 +103,7 @@
                 }
             }
             try {
-                if (updatePaths(repository)) {
+                if (updatePaths()) {
                     continue;
                 }
             } catch (IOException e) {
@@ -122,14 +124,18 @@
         return null;
     }
 
-    private boolean updatePaths(Path repo) throws IOException {
-        if (repo == null) {
-            repo = Repository.getRepository().getRepositoryPath().toPath();
+    private boolean updatePaths() throws IOException {
+        if (repository == null) {
+            SafePath p = Repository.getRepository().getRepositoryPath();
+            if (p == null) {
+                return false;
+            }
+            repository = p.toPath();
         }
         boolean foundNew = false;
         List<Path> added = new ArrayList<>();
         Set<Path> current = new HashSet<>();
-        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo)) {
+        try (DirectoryStream<Path> dirStream = fileAccess.newDirectoryStream(repository)) {
             for (Path p : dirStream) {
                 if (!pathLookup.containsKey(p)) {
                     String s = p.toString();
@@ -169,7 +175,7 @@
     }
 
     private long readStartTime(Path p) throws IOException {
-        try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+        try (RecordingInput in = new RecordingInput(p.toFile(), fileAccess, 100)) {
             ChunkHeader c = new ChunkHeader(in);
             return c.getStartNanos();
         }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Disassemble.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Disassemble.java	Fri Aug 09 01:18:18 2019 +0200
@@ -41,6 +41,7 @@
 import java.util.List;
 
 import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
 
 final class Disassemble extends Command {
@@ -163,7 +164,7 @@
     }
 
     private List<Long> findChunkSizes(Path p) throws IOException {
-        try (RecordingInput input = new RecordingInput(p.toFile())) {
+        try (RecordingInput input = new RecordingInput(p.toFile(), FileAccess.UNPRIVILIGED)) {
             List<Long> sizes = new ArrayList<>();
             ChunkHeader ch = new ChunkHeader(input);
             sizes.add(ch.getSize());
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Summary.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Summary.java	Fri Aug 09 01:18:18 2019 +0200
@@ -42,6 +42,7 @@
 import jdk.jfr.internal.MetadataDescriptor;
 import jdk.jfr.internal.Type;
 import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
 
 final class Summary extends Command {
@@ -91,7 +92,7 @@
         long totalDuration = 0;
         long chunks = 0;
 
-        try (RecordingInput input = new RecordingInput(p.toFile())) {
+        try (RecordingInput input = new RecordingInput(p.toFile(), FileAccess.UNPRIVILIGED)) {
             ChunkHeader first = new ChunkHeader(input);
             ChunkHeader ch = first;
             String eventPrefix = Type.EVENT_NAME_PREFIX;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetEndTime.java	Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.jfr.api.consumer.recordingstream;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import jdk.jfr.Event;
+import jdk.jfr.Name;
+import jdk.jfr.Recording;
+import jdk.jfr.StackTrace;
+import jdk.jfr.consumer.EventStream;
+
+/**
+ * @test
+ * @summary Tests EventStream::setEndTime
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestSetEndTime
+ */
+public final class TestSetEndTime {
+
+    @Name("Mark")
+    @StackTrace(false)
+    public final static class Mark extends Event {
+        public boolean before;
+    }
+
+    public static void main(String... args) throws Exception {
+        try (Recording r = new Recording()) {
+            r.setFlushInterval(Duration.ofSeconds(1));
+            r.start();
+            Instant start = Instant.now();
+            System.out.println("Instant.start() = " + start);
+            Thread.sleep(2000);
+            Mark event1 = new Mark();
+            event1.before = true;
+            event1.commit();
+            Thread.sleep(2000);
+            Instant end = Instant.now();
+            System.out.println("Instant.end() = " + end);
+            Thread.sleep(2000);
+            Mark event2 = new Mark();
+            event2.before = false;
+            event2.commit();
+            AtomicBoolean error = new AtomicBoolean(true);
+            try (EventStream d = EventStream.openRepository()) {
+                d.setStartTime(start); // needed so we don't start after end time
+                d.setEndTime(end);
+                d.onEvent(e -> {
+                    System.out.println(e);
+                    boolean before = e.getBoolean("before");
+                    if (before) {
+                        error.set(false);
+                    } else {
+                        error.set(true);
+                    }
+                });
+                d.start();
+                if (error.get()) {
+                    throw new Exception("Found unexpected event!");
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetStartTime.java	Mon Aug 05 23:57:47 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetStartTime.java	Fri Aug 09 01:18:18 2019 +0200
@@ -36,7 +36,7 @@
 
 /**
  * @test
- * @summary Tests RecordingStrream::setStartTime
+ * @summary Tests EventStream::setStartTime
  * @key jfr
  * @requires vm.hasJFR
  * @library /test/lib
@@ -57,9 +57,10 @@
             Mark event1 = new Mark();
             event1.before = true;
             event1.commit();
-
+            Thread.sleep(2000);
             Instant now = Instant.now();
-            System.out.println("Start time was " + now);
+            System.out.println("Instant.now() = " + now);
+            Thread.sleep(2000);
             Mark event2 = new Mark();
             event2.before = false;
             event2.commit();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java	Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.api.consumer.streaming;
+
+import java.util.concurrent.CountDownLatch;
+
+import jdk.jfr.Event;
+import jdk.jfr.Name;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
+import jdk.jfr.consumer.RecordingStream;
+
+/**
+ * @test
+ * @summary Verifies that a stream from a repository starts at the latest event
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.streaming.TestLatestEvent
+ */
+public class TestLatestEvent {
+
+    @Name("Chunk")
+    static class ChunkEvent extends Event {
+        boolean end;
+    }
+
+    public static void main(String... args) throws Exception {
+        try (RecordingStream r = new RecordingStream()) {
+            r.startAsync();
+            // Create chunks with events in the repository
+            for (int i = 0; i < 5; i++) {
+                try (Recording r1 = new Recording()) {
+                    r1.start();
+                    ChunkEvent e = new ChunkEvent();
+                    e.end = false;
+                    e.commit();
+                    r1.stop();
+                }
+            }
+            CountDownLatch endEventRecevied = new CountDownLatch(1);
+            CountDownLatch emitEvent = new CountDownLatch(1);
+            try (EventStream s = EventStream.openRepository()) {
+                s.onEvent("Chunk", e -> {
+                    if (e.getBoolean("end")) {
+                        endEventRecevied.countDown();
+                        return;
+                    }
+                    System.out.println("Stream should start at latest event:");
+                    System.out.println(e);
+                });
+
+                ChunkEvent e1 = new ChunkEvent();
+                e1.end = false;
+                e1.commit();
+                s.startAsync();
+                s.onFlush(() -> {
+                    emitEvent.countDown();
+                });
+                emitEvent.await();
+                ChunkEvent e2 = new ChunkEvent();
+                e2.end = true;
+                e2.commit();
+
+                endEventRecevied.await();
+            }
+        }
+    }
+}