Add EventStream::setEndTime(...) and a first stab at priviliged access to local repository
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Aug 09 01:18:18 2019 +0200
@@ -45,6 +45,7 @@
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.InternalEventFilter;
/*
@@ -71,10 +72,11 @@
private boolean reuse = true;
private boolean ordered = true;
private Instant startTime = null;
+ private Instant endTime = null;
private boolean started = false;
private long startNanos = 0;
+ private long endNanos = Long.MAX_VALUE;
private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
private boolean changed = false;
public StreamConfiguration(StreamConfiguration configuration) {
@@ -86,8 +88,10 @@
this.reuse = configuration.reuse;
this.ordered = configuration.ordered;
this.startTime = configuration.startTime;
+ this.endTime = configuration.endTime;
this.started = configuration.started;
this.startNanos = configuration.startNanos;
+ this.endNanos = configuration.endNanos;
this.dispatcherLookup = configuration.dispatcherLookup;
}
@@ -197,9 +201,16 @@
changed = true;
return this;
}
+ public StreamConfiguration setEndTime(Instant endTime) {
+ this.endTime = endTime;
+ this.endNanos = Utils.timeToNanos(endTime);
+ changed = true;
+ return this;
+ }
final public StreamConfiguration setStartTime(Instant startTime) {
this.startTime = startTime;
+ this.startNanos = Utils.timeToNanos(startTime);
changed = true;
return this;
}
@@ -208,6 +219,10 @@
return startTime;
}
+ public Object getEndTime() {
+ return endTime;
+ }
+
final public boolean isStarted() {
return started;
}
@@ -243,6 +258,10 @@
return startNanos;
}
+ final public long getEndNanos() {
+ return endNanos;
+ }
+
final public InternalEventFilter getFilter() {
return eventFilter;
}
@@ -264,12 +283,18 @@
sb.append("Started: ").append(started).append("\n");
sb.append("Start Time: ").append(startTime).append("\n");
sb.append("Start Nanos: ").append(startNanos).append("\n");
+ sb.append("End Time: ").append(endTime).append("\n");
+ sb.append("End Nanos: ").append(endNanos).append("\n");
return sb.toString();
}
private EventDispatcher[] getDispatchers() {
return dispatchers;
}
+
+
+
+
}
final static class EventDispatcher {
@@ -296,22 +321,22 @@
}
}
- public final static Instant NEXT_EVENT = Instant.now();
public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
private final AccessControlContext accessControlContext;
private final Thread thread;
-
- // Update bu updateConfiguration()
+ private final boolean active;
+ // Update by updateConfiguration()
protected StreamConfiguration configuration = new StreamConfiguration();
// Cache the last event type and dispatch.
private EventType lastEventType;
private EventDispatcher[] lastEventDispatch;
- public AbstractEventStream(AccessControlContext acc) throws IOException {
+ public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
this.accessControlContext = acc;
+ this.active = active;
// Create thread object in constructor to ensure caller has
// permission before constructing object
thread = new Thread(this);
@@ -460,16 +485,21 @@
if (configuration.isStarted()) {
throw new IllegalStateException("Stream is already started");
}
- if (startTime == null) {
- return;
- }
if (startTime.isBefore(Instant.EPOCH)) {
startTime = Instant.EPOCH;
}
updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
}
- private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+ public final void setEndTime(Instant endTime) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+}
+
+
+ protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
// Changes to the configuration must be serialized, so make
// sure that we have the monitor
Thread.holdsLock(this);
@@ -490,32 +520,27 @@
}
public final void startAsync(long startNanos) {
+ startInternal(startNanos);
+ thread.start();
+ }
+
+ public final void start(long startNanos) {
+ startInternal(startNanos);
+ run();
+ }
+
+ private void startInternal(long startNanos) {
synchronized (this) {
if (configuration.isStarted()) {
throw new IllegalStateException("Event stream can only be started once");
}
StreamConfiguration c = new StreamConfiguration(configuration);
- c.setStartNanos(startNanos);
+ if (active) {
+ c.setStartNanos(startNanos);
+ }
c.setStarted(true);
updateConfiguration(c);
}
- thread.start();
- }
-
- public final void start(long startNanos) {
- synchronized (this) {
- if (configuration.isStarted()) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- StreamConfiguration c = new StreamConfiguration(configuration);
- if (c.getStartTime() != null) {
- startNanos= c.getStartTime().toEpochMilli() * 1_000_000L;
- }
- c.setStartNanos(startNanos);
- c.setStarted(true);
- updateConfiguration(c);
- }
- run();
}
public final void awaitTermination(Duration timeout) {
@@ -534,4 +559,5 @@
}
abstract public void close();
+
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Aug 09 01:18:18 2019 +0200
@@ -64,7 +64,8 @@
private boolean reuse;
private boolean ordered;
private boolean resetEventCache;
- private long firstNanos;
+ private long firstNanos = 0;
+ private long lastNanos = Long.MAX_VALUE;
public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
this(new ChunkHeader(input), null, 1000);
@@ -118,7 +119,6 @@
return this.eventFilter;
}
-
/**
* Reads an event and returns null when segment or chunk ends.
*
@@ -373,6 +373,10 @@
}
this.firstNanos = firstNanos;
}
+ public void setLastNanos(long lastNanos) {
+ this.lastNanos = lastNanos;
+ }
+
// Need to call updateEventParsers() for
// change to take effect
@@ -388,6 +392,7 @@
ep.setOrdered(ordered);
ep.setReuse(reuse);
ep.setFirstNanos(firstNanos);
+ ep.setLastNanos(lastNanos);
if (resetEventCache) {
ep.resetCache();
}
@@ -407,4 +412,10 @@
public long getChunkDuration() {
return chunkHeader.getDurationNanos();
}
+
+ public long getStartNanos() {
+ return chunkHeader.getStartNanos();
+ }
+
+
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Aug 09 01:18:18 2019 +0200
@@ -36,6 +36,8 @@
import java.util.function.Consumer;
import jdk.jfr.internal.SecuritySupport.SafePath;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
import jdk.jfr.internal.consumer.RepositoryFiles;
@@ -44,7 +46,7 @@
* with chunk files.
*
*/
-final class EventDirectoryStream implements EventStream {
+class EventDirectoryStream implements EventStream {
static final class DirectoryStream extends AbstractEventStream {
@@ -52,34 +54,38 @@
private static final int DEFAULT_ARRAY_SIZE = 10_000;
private final RepositoryFiles repositoryFiles;
-
+ private final boolean active;
+ private final FileAccess fileAccess;
private ChunkParser chunkParser;
private RecordedEvent[] sortedList;
protected long chunkStartNanos;
- public DirectoryStream(AccessControlContext acc, Path p) throws IOException {
- super(acc);
- repositoryFiles = new RepositoryFiles(p == null ? null : new SafePath(p));
+ public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+ super(acc, active);
+ this.fileAccess = fileAccess;
+ this.active = active;
+ repositoryFiles = new RepositoryFiles(fileAccess, p == null ? null : new SafePath(p));
}
@Override
public void process() throws IOException {
- StreamConfiguration c1 = configuration;
- chunkStartNanos = c1.getStartNanos();
+ final StreamConfiguration c1 = configuration;
Path path;
- if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
- // TODO: Need to wait for next segment to arrive and then
- // use first event, but this will do for.
+ boolean validStartTime = active || c1.getStartTime() != null;
+ if (validStartTime) {
+ path = repositoryFiles.firstPath(c1.getStartNanos());
+ } else {
path = repositoryFiles.lastPath();
- } else {
- path = repositoryFiles.firstPath(chunkStartNanos);
}
if (path == null) { // closed
return;
}
chunkStartNanos = repositoryFiles.getTimestamp(path);
- try (RecordingInput input = new RecordingInput(path.toFile())) {
+ try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
chunkParser = new ChunkParser(input, c1.getReuse());
+ long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
+ long start = validStartTime ? c1.getStartNanos() : segmentStart;
+ long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE;
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !chunkParser.isChunkFinished()) {
@@ -87,7 +93,8 @@
boolean ordered = c2.getOrdered();
chunkParser.setReuse(c2.getReuse());
chunkParser.setOrdered(ordered);
- chunkParser.setFirstNanos(c2.getStartNanos());
+ chunkParser.setFirstNanos(start);
+ chunkParser.setLastNanos(end);
chunkParser.resetEventCache();
chunkParser.setParserFilter(c2.getFilter());
chunkParser.updateEventParsers();
@@ -98,7 +105,13 @@
awaitnewEvent = processUnordered(awaitnewEvent);
}
runFlushActions();
+ if (segmentStart > end) {
+ close();
+ return;
+ }
}
+
+
if (isClosed()) {
return;
}
@@ -110,11 +123,12 @@
chunkStartNanos = repositoryFiles.getTimestamp(path);
input.setFile(path);
chunkParser = chunkParser.newChunkParser();
+ // No need filter when we reach new chunk
+ // start = 0;
}
}
}
-
private boolean processOrdered(boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -169,9 +183,8 @@
private final AbstractEventStream eventStream;
- public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
- eventStream = new DirectoryStream(acc, p);
- eventStream.setStartTime(startTime);
+ public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException {
+ eventStream = new DirectoryStream(acc, p, access, active);
}
@Override
@@ -187,12 +200,12 @@
@Override
public void start() {
- start(Instant.now().toEpochMilli() * 1000 * 1000L);
+ start(Utils.timeToNanos(Instant.now()));
}
@Override
public void startAsync() {
- startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
+ startAsync(Utils.timeToNanos(Instant.now()));
}
@Override
@@ -246,6 +259,12 @@
eventStream.setStartTime(startTime);
}
+ @Override
+ public void setEndTime(Instant endTime) {
+ eventStream.setEndTime(endTime);
+ }
+
+
public void start(long startNanos) {
eventStream.start(startNanos);
}
@@ -253,4 +272,6 @@
public void startAsync(long startNanos) {
eventStream.startAsync(startNanos);
}
+
+
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Aug 09 01:18:18 2019 +0200
@@ -35,6 +35,7 @@
import java.util.Objects;
import java.util.function.Consumer;
+import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
/**
@@ -52,17 +53,32 @@
private RecordedEvent[] sortedList;
public FileStream(AccessControlContext acc, Path path) throws IOException {
- super(acc);
- this.input = new RecordingInput(path.toFile());
+ super(acc, false);
+ this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
; }
@Override
public void process() throws IOException {
- StreamConfiguration c1 = configuration;
+ final StreamConfiguration c1 = configuration;
+ long start = 0;
+ long end = Long.MAX_VALUE;
+ if (c1.getStartTime() != null) {
+ start = c1.getStartNanos();
+ }
+ if (c1.getEndTime() != null) {
+ end = c1.getEndNanos();
+ }
+
chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
+ if (chunkParser.getStartNanos() > end) {
+ close();
+ return;
+ }
StreamConfiguration c2 = configuration;
boolean ordered = c2.getOrdered();
+ chunkParser.setFirstNanos(start);
+ chunkParser.setLastNanos(end);
chunkParser.setReuse(c2.getReuse());
chunkParser.setOrdered(ordered);
chunkParser.resetEventCache();
@@ -207,4 +223,9 @@
public void setStartTime(Instant startTime) {
eventStream.setStartTime(startTime);
}
+
+ @Override
+ public void setEndTime(Instant endTime) {
+ eventStream.setEndTime(endTime);
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Fri Aug 09 01:18:18 2019 +0200
@@ -55,6 +55,7 @@
private int index;
private boolean ordered;
private long firstNanos;
+ private long lastNanos = Long.MAX_VALUE;
private long thresholdNanos = -1;
EventParser(TimeConverter timeConverter, EventType type, Parser[] parsers) {
@@ -120,8 +121,12 @@
}
endTicks += durationTicks;
}
- if (firstNanos > 0L) {
- if (timeConverter.convertTimestamp(endTicks) < firstNanos) {
+ if (firstNanos != 0L || lastNanos != Long.MAX_VALUE) {
+ long eventEnd = timeConverter.convertTimestamp(endTicks);
+ if (eventEnd < firstNanos) {
+ return null;
+ }
+ if (eventEnd > lastNanos) {
return null;
}
}
@@ -173,6 +178,10 @@
this.firstNanos = firstNanos;
}
+ public void setLastNanos(long lastNanos) {
+ this.lastNanos = lastNanos;
+ }
+
public void setOrdered(boolean ordered) {
if (this.ordered == ordered) {
return;
@@ -180,4 +189,5 @@
this.ordered = ordered;
this.index = 0;
}
+
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Aug 09 01:18:18 2019 +0200
@@ -27,15 +27,18 @@
import java.io.IOException;
import java.nio.file.Path;
+import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
import java.time.Instant;
+import java.util.Objects;
import java.util.function.Consumer;
-import jdk.jfr.internal.Repository;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
/**
- * Represents a stream of event that actions can be performed up on.
+ * Represents a stream of events.
*/
public interface EventStream extends AutoCloseable {
@@ -50,16 +53,18 @@
*
* @throws IOException if a stream can't be opened, or an I/O error occurs
* when trying to access the repository
+ *
+ * @throws SecurityException if a security manager exists and the caller
+ * does not have
+ * {@code FlightRecorderPermission("accessFlightRecorder")}
*/
public static EventStream openRepository() throws IOException {
- Repository r = Repository.getRepository();
- r.ensureRepository();
- Path path = r.getRepositoryPath().toPath();
- return new EventDirectoryStream(AccessController.getContext(), path, AbstractEventStream.NEXT_EVENT);
+ Utils.checkAccessFlightRecorder();
+ return new EventDirectoryStream(AccessController.getContext(), null, FileAccess.PRIVILIGED, false);
}
/**
- * Creates a stream from a disk repository.
+ * Creates an event stream from a disk repository.
* <p>
* By default, the stream starts with the next event flushed by Flight
* Recorder.
@@ -68,11 +73,17 @@
*
* @return an event stream, not {@code null}
*
- * @throws IOException if a stream can't be opened, or an I/O error occurs
+ * @throws IOException if a stream can't be opened, or an I/O error occurs
* when trying to access the repository
+ *
+ * @throws SecurityException if a security manager exists and its
+ * {@code checkRead} method denies read access to the directory, or
+ * files in the directory.
*/
public static EventStream openRepository(Path directory) throws IOException {
- return new EventDirectoryStream(AccessController.getContext(), directory, AbstractEventStream.NEXT_EVENT);
+ Objects.nonNull(directory);
+ AccessControlContext acc = AccessController.getContext();
+ return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILIGED, false);
}
/**
@@ -86,6 +97,9 @@
*
* @throws IOException if a stream can't be opened, or an I/O error occurs
* during reading
+ *
+ * @throws SecurityException if a security manager exists and its
+ * {@code checkRead} method denies read access to the file
*/
public static EventStream openFile(Path file) throws IOException {
return new EventFileStream(file, null, null);
@@ -174,6 +188,8 @@
/**
* Specifies start time of the event stream.
+ * <p>
+ * The start time must be set before the stream is started.
*
* @param startTime the start time, not {@code null}
*
@@ -182,6 +198,20 @@
public void setStartTime(Instant startTime);
/**
+ * Specifies end time of the event stream.
+ * <p>
+ * The end time must be set before the stream is started.
+ * <p>
+ * When the end time is reached the stream is terminated.
+ *
+ * @param endTime the end time, not {@code null}
+ *
+ * @throws IllegalStateException if the stream has already been started
+ */
+ public void setEndTime(Instant endTime);
+
+
+ /**
* Start processing events in the stream.
* <p>
* All actions performed on this stream will happen in the current thread.
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Fri Aug 09 01:18:18 2019 +0200
@@ -39,6 +39,7 @@
import jdk.jfr.internal.MetadataDescriptor;
import jdk.jfr.internal.Type;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
/**
@@ -81,7 +82,7 @@
*/
public RecordingFile(Path file) throws IOException {
this.file = file.toFile();
- this.input = new RecordingInput(this.file);
+ this.input = new RecordingInput(this.file, FileAccess.UNPRIVILIGED);
findNext();
}
@@ -134,7 +135,7 @@
MetadataDescriptor previous = null;
List<EventType> types = new ArrayList<>();
HashSet<Long> foundIds = new HashSet<>();
- try (RecordingInput ri = new RecordingInput(file)) {
+ try (RecordingInput ri = new RecordingInput(file, FileAccess.UNPRIVILIGED)) {
ChunkHeader ch = new ChunkHeader(ri);
aggregateEventTypeForChunk(ch, null, types, foundIds);
while (!ch.isLastChunk()) {
@@ -150,7 +151,7 @@
MetadataDescriptor previous = null;
List<Type> types = new ArrayList<>();
HashSet<Long> foundIds = new HashSet<>();
- try (RecordingInput ri = new RecordingInput(file)) {
+ try (RecordingInput ri = new RecordingInput(file, FileAccess.UNPRIVILIGED)) {
ChunkHeader ch = new ChunkHeader(ri);
ch.awaitFinished();
aggregateTypeForChunk(ch, null, types, foundIds);
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri Aug 09 01:18:18 2019 +0200
@@ -41,6 +41,7 @@
import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.FileAccess;
/**
* A recording stream produces events from the current JVM (Java Virtual
@@ -99,7 +100,7 @@
this.recording = new Recording();
this.recording.setFlushInterval(Duration.ofMillis(1000));
try {
- this.stream = new EventDirectoryStream(acc, null, null);
+ this.stream = new EventDirectoryStream(acc, null, FileAccess.PRIVILIGED, true);
} catch (IOException ioe) {
throw new IllegalStateException(ioe.getMessage());
}
@@ -356,4 +357,9 @@
public void setStartTime(Instant startTime) {
stream.setStartTime(startTime);
}
+
+ @Override
+ public void setEndTime(Instant endTime) {
+ stream.setStartTime(endTime);
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Fri Aug 09 01:18:18 2019 +0200
@@ -43,6 +43,7 @@
import java.lang.reflect.Modifier;
import java.nio.file.Path;
import java.time.Duration;
+import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -619,4 +620,8 @@
}
}
+
+ public static long timeToNanos(Instant timestamp) {
+ return timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano();
+ }
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkHeader.java Fri Aug 09 01:18:18 2019 +0200
@@ -125,7 +125,7 @@
byte fileState2 = input.readPhysicalByte();
if (fileState1 == fileState2) { // valid header
finished = fileState1 == 0;
- if (constantPoolPosition != 0 && metadataPosition != 0) {
+ if (metadataPosition != 0) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Setting input size to " + (absoluteChunkStart + chunkSize));
if (finished) {
// This assumes that the whole recording
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/FileAccess.java Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,46 @@
+package jdk.jfr.internal.consumer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+// Protected by modular boundaries.
+public abstract class FileAccess {
+ public final static FileAccess PRIVILIGED = new UnPriviliged();
+ // TODO: Should be changed Priviliged class
+ public final static FileAccess UNPRIVILIGED = new UnPriviliged();
+
+ abstract RandomAccessFile openRAF(File f, String mode) throws FileNotFoundException;
+ abstract DirectoryStream<Path> newDirectoryStream(Path repository) throws IOException;
+
+ static class Priviliged extends FileAccess {
+ @Override
+ RandomAccessFile openRAF(File f, String mode) {
+ // TDOO: Implement
+ return null;
+ }
+
+ @Override
+ protected DirectoryStream<Path> newDirectoryStream(Path repository) {
+ // TDOO: Implement
+ return null;
+ }
+ }
+
+ static class UnPriviliged extends FileAccess {
+ @Override
+ RandomAccessFile openRAF(File f, String mode) throws FileNotFoundException {
+ return new RandomAccessFile(f, mode);
+ }
+
+ @Override
+ DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException {
+ return Files.newDirectoryStream(dir);
+ }
+
+ }
+}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RecordingInput.java Fri Aug 09 01:18:18 2019 +0200
@@ -64,24 +64,25 @@
blockPositionEnd = 0;
}
}
-
+ private final int blockSize;
+ private final FileAccess fileAccess;
private RandomAccessFile file;
private String filename;
private Block currentBlock = new Block();
private Block previousBlock = new Block();
private long position;
- private final int blockSize;
private long size = -1; // Fail fast if setSize(...) has not been called
// before parsing
- public RecordingInput(File f, int blockSize) throws IOException {
+ public RecordingInput(File f, FileAccess fileAccess, int blockSize) throws IOException {
this.blockSize = blockSize;
+ this.fileAccess = fileAccess;
initialize(f);
}
private void initialize(File f) throws IOException {
this.filename = f.getAbsolutePath().toString();
- this.file = new RandomAccessFile(f, "r");
+ this.file = fileAccess.openRAF(f, "r");
this.position = 0;
this.size = -1;
this.currentBlock.reset();
@@ -91,8 +92,8 @@
}
}
- public RecordingInput(File f) throws IOException {
- this(f, DEFAULT_BLOCK_SIZE);
+ public RecordingInput(File f, FileAccess fileAccess) throws IOException {
+ this(f, fileAccess, DEFAULT_BLOCK_SIZE);
}
public void positionPhysical(long position) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Fri Aug 09 01:18:18 2019 +0200
@@ -46,13 +46,15 @@
import jdk.jfr.internal.SecuritySupport.SafePath;
public final class RepositoryFiles {
- private final Path repository;
+ private final FileAccess fileAccess;
private final NavigableMap<Long, Path> pathSet = new TreeMap<>();
private final Map<Path, Long> pathLookup = new HashMap<>();
private volatile boolean closed;
+ private Path repository;
- public RepositoryFiles(SafePath repository) {
+ public RepositoryFiles(FileAccess fileAccess, SafePath repository) {
this.repository = repository == null ? null : repository.toPath();
+ this.fileAccess = fileAccess;
}
public long getTimestamp(Path p) {
@@ -63,7 +65,7 @@
// Wait for chunks
while (!closed) {
try {
- if (updatePaths(repository)) {
+ if (updatePaths()) {
break;
}
} catch (IOException e) {
@@ -101,7 +103,7 @@
}
}
try {
- if (updatePaths(repository)) {
+ if (updatePaths()) {
continue;
}
} catch (IOException e) {
@@ -122,14 +124,18 @@
return null;
}
- private boolean updatePaths(Path repo) throws IOException {
- if (repo == null) {
- repo = Repository.getRepository().getRepositoryPath().toPath();
+ private boolean updatePaths() throws IOException {
+ if (repository == null) {
+ SafePath p = Repository.getRepository().getRepositoryPath();
+ if (p == null) {
+ return false;
+ }
+ repository = p.toPath();
}
boolean foundNew = false;
List<Path> added = new ArrayList<>();
Set<Path> current = new HashSet<>();
- try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo)) {
+ try (DirectoryStream<Path> dirStream = fileAccess.newDirectoryStream(repository)) {
for (Path p : dirStream) {
if (!pathLookup.containsKey(p)) {
String s = p.toString();
@@ -169,7 +175,7 @@
}
private long readStartTime(Path p) throws IOException {
- try (RecordingInput in = new RecordingInput(p.toFile(), 100)) {
+ try (RecordingInput in = new RecordingInput(p.toFile(), fileAccess, 100)) {
ChunkHeader c = new ChunkHeader(in);
return c.getStartNanos();
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Disassemble.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Disassemble.java Fri Aug 09 01:18:18 2019 +0200
@@ -41,6 +41,7 @@
import java.util.List;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
final class Disassemble extends Command {
@@ -163,7 +164,7 @@
}
private List<Long> findChunkSizes(Path p) throws IOException {
- try (RecordingInput input = new RecordingInput(p.toFile())) {
+ try (RecordingInput input = new RecordingInput(p.toFile(), FileAccess.UNPRIVILIGED)) {
List<Long> sizes = new ArrayList<>();
ChunkHeader ch = new ChunkHeader(input);
sizes.add(ch.getSize());
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Summary.java Mon Aug 05 23:57:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/tool/Summary.java Fri Aug 09 01:18:18 2019 +0200
@@ -42,6 +42,7 @@
import jdk.jfr.internal.MetadataDescriptor;
import jdk.jfr.internal.Type;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
final class Summary extends Command {
@@ -91,7 +92,7 @@
long totalDuration = 0;
long chunks = 0;
- try (RecordingInput input = new RecordingInput(p.toFile())) {
+ try (RecordingInput input = new RecordingInput(p.toFile(), FileAccess.UNPRIVILIGED)) {
ChunkHeader first = new ChunkHeader(input);
ChunkHeader ch = first;
String eventPrefix = Type.EVENT_NAME_PREFIX;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetEndTime.java Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.jfr.api.consumer.recordingstream;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import jdk.jfr.Event;
+import jdk.jfr.Name;
+import jdk.jfr.Recording;
+import jdk.jfr.StackTrace;
+import jdk.jfr.consumer.EventStream;
+
+/**
+ * @test
+ * @summary Tests EventStream::setEndTime
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.recordingstream.TestSetEndTime
+ */
+public final class TestSetEndTime {
+
+ @Name("Mark")
+ @StackTrace(false)
+ public final static class Mark extends Event {
+ public boolean before;
+ }
+
+ public static void main(String... args) throws Exception {
+ try (Recording r = new Recording()) {
+ r.setFlushInterval(Duration.ofSeconds(1));
+ r.start();
+ Instant start = Instant.now();
+ System.out.println("Instant.start() = " + start);
+ Thread.sleep(2000);
+ Mark event1 = new Mark();
+ event1.before = true;
+ event1.commit();
+ Thread.sleep(2000);
+ Instant end = Instant.now();
+ System.out.println("Instant.end() = " + end);
+ Thread.sleep(2000);
+ Mark event2 = new Mark();
+ event2.before = false;
+ event2.commit();
+ AtomicBoolean error = new AtomicBoolean(true);
+ try (EventStream d = EventStream.openRepository()) {
+ d.setStartTime(start); // needed so we don't start after end time
+ d.setEndTime(end);
+ d.onEvent(e -> {
+ System.out.println(e);
+ boolean before = e.getBoolean("before");
+ if (before) {
+ error.set(false);
+ } else {
+ error.set(true);
+ }
+ });
+ d.start();
+ if (error.get()) {
+ throw new Exception("Found unexpected event!");
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetStartTime.java Mon Aug 05 23:57:47 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestSetStartTime.java Fri Aug 09 01:18:18 2019 +0200
@@ -36,7 +36,7 @@
/**
* @test
- * @summary Tests RecordingStrream::setStartTime
+ * @summary Tests EventStream::setStartTime
* @key jfr
* @requires vm.hasJFR
* @library /test/lib
@@ -57,9 +57,10 @@
Mark event1 = new Mark();
event1.before = true;
event1.commit();
-
+ Thread.sleep(2000);
Instant now = Instant.now();
- System.out.println("Start time was " + now);
+ System.out.println("Instant.now() = " + now);
+ Thread.sleep(2000);
Mark event2 = new Mark();
event2.before = false;
event2.commit();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/jdk/jfr/api/consumer/streaming/TestLatestEvent.java Fri Aug 09 01:18:18 2019 +0200
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.api.consumer.streaming;
+
+import java.util.concurrent.CountDownLatch;
+
+import jdk.jfr.Event;
+import jdk.jfr.Name;
+import jdk.jfr.Recording;
+import jdk.jfr.consumer.EventStream;
+import jdk.jfr.consumer.RecordingStream;
+
+/**
+ * @test
+ * @summary Verifies that a stream from a repository starts at the latest event
+ * @key jfr
+ * @requires vm.hasJFR
+ * @library /test/lib
+ * @run main/othervm jdk.jfr.api.consumer.streaming.TestLatestEvent
+ */
+public class TestLatestEvent {
+
+ @Name("Chunk")
+ static class ChunkEvent extends Event {
+ boolean end;
+ }
+
+ public static void main(String... args) throws Exception {
+ try (RecordingStream r = new RecordingStream()) {
+ r.startAsync();
+ // Create chunks with events in the repository
+ for (int i = 0; i < 5; i++) {
+ try (Recording r1 = new Recording()) {
+ r1.start();
+ ChunkEvent e = new ChunkEvent();
+ e.end = false;
+ e.commit();
+ r1.stop();
+ }
+ }
+ CountDownLatch endEventRecevied = new CountDownLatch(1);
+ CountDownLatch emitEvent = new CountDownLatch(1);
+ try (EventStream s = EventStream.openRepository()) {
+ s.onEvent("Chunk", e -> {
+ if (e.getBoolean("end")) {
+ endEventRecevied.countDown();
+ return;
+ }
+ System.out.println("Stream should start at latest event:");
+ System.out.println(e);
+ });
+
+ ChunkEvent e1 = new ChunkEvent();
+ e1.end = false;
+ e1.commit();
+ s.startAsync();
+ s.onFlush(() -> {
+ emitEvent.countDown();
+ });
+ emitEvent.await();
+ ChunkEvent e2 = new ChunkEvent();
+ e2.end = true;
+ e2.commit();
+
+ endEventRecevied.await();
+ }
+ }
+ }
+}