--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -0,0 +1,547 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.consumer;
+
+import java.io.IOException;
+import java.lang.invoke.VarHandle;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.consumer.InternalEventFilter;
+
+/*
+ * Purpose of this class is to simplify the implementation of
+ * an event stream. In particular, it handles:
+ *
+ * - configuration storage
+ * - atomic updates to a configuration
+ * - dispatch mechanism
+ * - error handling
+ * - security
+ *
+ */
+abstract class AbstractEventStream implements Runnable {
+
+ public static final class StreamConfiguration {
+ private static final Runnable[] NO_ACTIONS = new Runnable[0];
+
+ private Runnable[] flushActions = NO_ACTIONS;
+ private Runnable[] closeActions = NO_ACTIONS;
+ private EventDispatcher[] dispatchers = NO_DISPATCHERS;
+ private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+ private boolean closed = false;
+ private boolean reuse = true;
+ private boolean ordered = true;
+ private Instant startTime = null;
+ private boolean started = false;
+ private long startNanos = 0;
+ private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
+ private boolean changed = false;
+
+ public StreamConfiguration(StreamConfiguration configuration) {
+ this.flushActions = configuration.flushActions;
+ this.closeActions = configuration.closeActions;
+ this.dispatchers = configuration.dispatchers;
+ this.eventFilter = configuration.eventFilter;
+ this.closed = configuration.closed;
+ this.reuse = configuration.reuse;
+ this.ordered = configuration.ordered;
+ this.startTime = configuration.startTime;
+ this.started = configuration.started;
+ this.startNanos = configuration.startNanos;
+ this.dispatcherLookup = configuration.dispatcherLookup;
+ }
+
+ public StreamConfiguration() {
+ }
+
+ final public StreamConfiguration remove(Object action) {
+ flushActions = remove(flushActions, action);
+ closeActions = remove(closeActions, action);
+ dispatchers = removeDispatch(dispatchers, action);
+ return this;
+ }
+
+ final public StreamConfiguration addDispatcher(EventDispatcher e) {
+ dispatchers = add(dispatchers, e);
+ eventFilter = buildFilter(dispatchers);
+ dispatcherLookup = new LongMap<>();
+ return this;
+ }
+
+ final public StreamConfiguration addFlushAction(Runnable action) {
+ flushActions = add(flushActions, action);
+ return this;
+ }
+
+ final public StreamConfiguration addCloseAction(Runnable action) {
+ closeActions = add(closeActions, action);
+ return this;
+ }
+
+ final public StreamConfiguration setClosed(boolean closed) {
+ this.closed = closed;
+ changed = true;
+ return this;
+ }
+
+ final public boolean isClosed() {
+ return closed;
+ }
+
+ final public Runnable[] getCloseActions() {
+ return closeActions;
+ }
+
+ final public Runnable[] getFlushActions() {
+ return flushActions;
+ }
+
+ private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
+ List<EventDispatcher> list = new ArrayList<>(array.length);
+ boolean modified = false;
+ for (int i = 0; i < array.length; i++) {
+ if (array[i].action != action) {
+ list.add(array[i]);
+ } else {
+ modified = true;
+ }
+ }
+ EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
+ if (modified) {
+ eventFilter = buildFilter(result);
+ dispatcherLookup = new LongMap<>();
+ changed = true;
+ }
+ return result;
+ }
+
+ private <T> T[] remove(T[] array, Object action) {
+ List<T> list = new ArrayList<>(array.length);
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != action) {
+ list.add(array[i]);
+ } else {
+ changed = true;
+ }
+ }
+ return list.toArray(array);
+ }
+
+ private <T> T[] add(T[] array, T object) {
+ List<T> list = new ArrayList<>(Arrays.asList(array));
+ list.add(object);
+ changed = true;
+ return list.toArray(array);
+ }
+
+ private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+ InternalEventFilter ef = new InternalEventFilter();
+ for (EventDispatcher ed : dispatchers) {
+ String name = ed.eventName;
+ if (name == null) {
+ return InternalEventFilter.ACCEPT_ALL;
+ }
+ ef.setThreshold(name, 0);
+ }
+ return ef.threadSafe();
+ }
+
+ final public StreamConfiguration setReuse(boolean reuse) {
+ this.reuse = reuse;
+ changed = true;
+ return this;
+ }
+
+ final public StreamConfiguration setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ changed = true;
+ return this;
+ }
+
+ final public StreamConfiguration setStartTime(Instant startTime) {
+ this.startTime = startTime;
+ changed = true;
+ return this;
+ }
+
+ final public Instant getStartTime() {
+ return startTime;
+ }
+
+ final public boolean isStarted() {
+ return started;
+ }
+
+ final public StreamConfiguration setStartNanos(long startNanos) {
+ this.startNanos = startNanos;
+ changed = true;
+ return this;
+ }
+
+ final public void setStarted(boolean started) {
+ this.started = started;
+ changed = true;
+ }
+
+ final public boolean hasChanged() {
+ return changed;
+ }
+
+ final public boolean getReuse() {
+ return reuse;
+ }
+
+ final public boolean getOrdered() {
+ return ordered;
+ }
+
+ final public InternalEventFilter getFiler() {
+ return eventFilter;
+ }
+
+ final public long getStartNanos() {
+ return startNanos;
+ }
+
+ final public InternalEventFilter getFilter() {
+ return eventFilter;
+ }
+
+ final public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Runnable flush : flushActions) {
+ sb.append("Flush Action: ").append(flush).append("\n");
+ }
+ for (Runnable close : closeActions) {
+ sb.append("Close Action: " + close + "\n");
+ }
+ for (EventDispatcher dispatcher : dispatchers) {
+ sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
+ }
+ sb.append("Closed: ").append(closed).append("\n");
+ sb.append("Reuse: ").append(reuse).append("\n");
+ sb.append("Ordered: ").append(ordered).append("\n");
+ sb.append("Started: ").append(started).append("\n");
+ sb.append("Start Time: ").append(startTime).append("\n");
+ sb.append("Start Nanos: ").append(startNanos).append("\n");
+ return sb.toString();
+ }
+
+ private EventDispatcher[] getDispatchers() {
+ return dispatchers;
+ }
+ }
+
+ final static class EventDispatcher {
+ final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
+
+ final private String eventName;
+ final private Consumer<RecordedEvent> action;
+
+ public EventDispatcher(Consumer<RecordedEvent> action) {
+ this(null, action);
+ }
+
+ public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
+ this.eventName = eventName;
+ this.action = action;
+ }
+
+ public void offer(RecordedEvent event) {
+ action.accept(event);
+ }
+
+ public boolean accepts(EventType eventType) {
+ return (eventName == null || eventType.getName().equals(eventName));
+ }
+ }
+
+ public final static Instant NEXT_EVENT = Instant.now();
+ public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
+
+ private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
+ private final AccessControlContext accessControlContext;
+ private final Thread thread;
+
+ // Update bu updateConfiguration()
+ protected StreamConfiguration configuration = new StreamConfiguration();
+
+ // Cache the last event type and dispatch.
+ private EventType lastEventType;
+ private EventDispatcher[] lastEventDispatch;
+
+ public AbstractEventStream(AccessControlContext acc) throws IOException {
+ this.accessControlContext = acc;
+ // Create thread object in constructor to ensure caller has permission
+ // permission before constructing object
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ }
+
+ public final void run() {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ execute();
+ return null;
+ }
+ }, accessControlContext);
+
+ }
+
+ private void execute() {
+ // JVM.getJVM().exclude(Thread.currentThread());
+ try {
+ updateStartNanos();
+ process();
+ } catch (IOException e) {
+ if (!isClosed()) {
+ logException(e);
+ }
+ } catch (Exception e) {
+ logException(e);
+ } finally {
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+ }
+ }
+
+ // User setting overrides default
+ private void updateStartNanos() {
+ if (configuration.getStartTime() != null) {
+ StreamConfiguration c = new StreamConfiguration(configuration);
+ try {
+ c.setStartNanos(c.getStartTime().toEpochMilli() * 1_000_000L);
+ } catch (ArithmeticException ae) {
+ c.setStartNanos(Long.MAX_VALUE);
+ }
+ updateConfiguration(c);
+ }
+ }
+
+ private void logException(Exception e) {
+ // FIXME: e.printStackTrace(); for debugging purposes,
+ // remove before before integration
+ e.printStackTrace();
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage());
+ }
+
+ public abstract void process() throws IOException;
+
+ protected final void clearLastDispatch() {
+ lastEventDispatch = null;
+ lastEventType = null;
+ }
+
+ protected final void dispatch(RecordedEvent event) {
+ EventType type = event.getEventType();
+ EventDispatcher[] ret = null;
+ if (type == lastEventType) {
+ ret = lastEventDispatch;
+ } else {
+ ret = configuration.dispatcherLookup.get(type.getId());
+ if (ret == null) {
+ List<EventDispatcher> list = new ArrayList<>();
+ for (EventDispatcher e : configuration.getDispatchers()) {
+ if (e.accepts(type)) {
+ list.add(e);
+ }
+ }
+ ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+ configuration.dispatcherLookup.put(type.getId(), ret);
+ }
+ lastEventDispatch = ret;
+ }
+ for (int i = 0; i < ret.length; i++) {
+ try {
+ ret[i].offer(event);
+ } catch (Exception e) {
+ logException(e);
+ }
+ }
+ }
+
+ public final void runCloseActions() {
+ Runnable[] cas = configuration.getCloseActions();
+ for (int i = 0; i < cas.length; i++) {
+ try {
+ cas[i].run();
+ } catch (Exception e) {
+ logException(e);
+ }
+ }
+ }
+
+ public final void runFlushActions() {
+ Runnable[] fas = configuration.getFlushActions();
+ for (int i = 0; i < fas.length; i++) {
+ try {
+ fas[i].run();
+ } catch (Exception e) {
+ logException(e);
+ }
+ }
+ }
+
+ // Purpose of synchronizing the following methods is
+ // to serialize changes to the configuration, so only one
+ // thread at a time can change the configuration.
+ //
+ // The purpose is not to guard the configuration field. A new
+ // configuration is published using updateConfiguration
+ //
+ public final synchronized boolean remove(Object action) {
+ return updateConfiguration(new StreamConfiguration(configuration).remove(action));
+ }
+
+ public final synchronized void onEvent(Consumer<RecordedEvent> action) {
+ add(new EventDispatcher(action));
+ }
+
+ public final synchronized void onEvent(String eventName, Consumer<RecordedEvent> action) {
+ add(new EventDispatcher(eventName, action));
+ }
+
+ private final synchronized void add(EventDispatcher e) {
+ updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
+ }
+
+ public final synchronized void onFlush(Runnable action) {
+ updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
+ }
+
+ public final synchronized void addCloseAction(Runnable action) {
+ updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
+ }
+
+ public final synchronized void setClosed(boolean closed) {
+ updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
+ }
+
+ public final synchronized void setReuse(boolean reuse) {
+ updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
+ }
+
+ public final synchronized void setOrdered(boolean ordered) {
+ updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
+ }
+
+ public final synchronized void setStartNanos(long startNanos) {
+ updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos));
+ }
+
+ public final synchronized void setStartTime(Instant startTime) {
+ Objects.nonNull(startTime);
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ if (startTime == null) {
+ return;
+ }
+ if (startTime.isBefore(Instant.EPOCH)) {
+ startTime = Instant.EPOCH;
+ }
+ updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+ }
+
+ private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+ // Changes to the configuration must be serialized, so make
+ // sure that we have the monitor
+ Thread.holdsLock(this);
+ if (newConfiguration.hasChanged()) {
+ // Publish objects indirectly held by new configuration object
+ VarHandle.releaseFence();
+ configuration = newConfiguration;
+ // Publish the field reference. Making the field volatile
+ // would be an alternative, but it is repeatedly read.
+ VarHandle.releaseFence();
+ return true;
+ }
+ return false;
+ }
+
+ public final boolean isClosed() {
+ return configuration.isClosed();
+ }
+
+ public final void startAsync(long startNanos) {
+ synchronized (this) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ StreamConfiguration c = new StreamConfiguration(configuration);
+ c.setStartNanos(startNanos);
+ c.setStarted(true);
+ updateConfiguration(c);
+ }
+ thread.start();
+ }
+
+ public final void start(long startNanos) {
+ synchronized (this) {
+ if (configuration.isStarted()) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ StreamConfiguration c = new StreamConfiguration(configuration);
+ c.setStartNanos(startNanos);
+ c.setStarted(true);
+ updateConfiguration(c);
+ }
+ run();
+ }
+
+ public final void awaitTermination(Duration timeout) {
+ Objects.requireNonNull(timeout);
+ if (thread != Thread.currentThread()) {
+ try {
+ thread.join(timeout.toMillis());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public final void awaitTermination() {
+ awaitTermination(Duration.ofMillis(0));
+ }
+
+ abstract public void close();
+}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Wed Jul 31 14:07:44 2019 +0200
@@ -121,6 +121,8 @@
/**
* Reads an event and returns null when segment or chunk ends.
+ *
+ * @param awaitNewEvents wait for new data.
*/
public RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException {
long absoluteChunkEnd = chunkHeader.getEnd();
@@ -192,7 +194,9 @@
}
private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException {
- Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
+ if (Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO)) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
+ }
while (true) {
chunkHeader.refresh();
if (absoluteChunkEnd != chunkHeader.getEnd()) {
@@ -209,7 +213,7 @@
long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart();
long lastCP = -1;
long delta = -1;
- boolean log = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE);
+ boolean logTrace = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE);
while (thisCP != abortCP && delta != 0) {
input.position(thisCP);
lastCP = thisCP;
@@ -226,9 +230,11 @@
int poolCount = input.readInt();
final long logLastCP = lastCP;
final long logDelta = delta;
- Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> {
- return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount;
- });
+ if (logTrace) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> {
+ return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount;
+ });
+ }
for (int i = 0; i < poolCount; i++) {
long id = input.readLong(); // type id
ConstantLookup lookup = constantLookups.get(id);
@@ -251,7 +257,7 @@
if (count == 0) {
throw new InternalError("Pool " + type.getName() + " must contain at least one element ");
}
- if (log) {
+ if (logTrace) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName());
}
for (int j = 0; j < count; j++) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -45,43 +45,52 @@
*/
final class EventDirectoryStream implements EventStream {
- static final class DirectoryConsumer extends EventConsumer {
+ static final class DirectoryStream extends AbstractEventStream {
private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
private static final int DEFAULT_ARRAY_SIZE = 10_000;
+
private final RepositoryFiles repositoryFiles;
+
private ChunkParser chunkParser;
private RecordedEvent[] sortedList;
protected long chunkStartNanos;
- public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException {
+ public DirectoryStream(AccessControlContext acc, Path p) throws IOException {
super(acc);
repositoryFiles = new RepositoryFiles(p);
}
@Override
public void process() throws IOException {
- chunkStartNanos = startNanos;
+ StreamConfiguration c1 = configuration;
+ chunkStartNanos = c1.getStartNanos();
Path path;
- if (startTime == EventConsumer.NEXT_EVENT) {
+ if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) {
// TODO: Need to skip forward to the next event
// For now, use the last chunk.
path = repositoryFiles.lastPath();
} else {
path = repositoryFiles.nextPath(chunkStartNanos);
}
+ if (path == null) { // closed
+ return;
+ }
chunkStartNanos = repositoryFiles.getTimestamp(path) + 1;
try (RecordingInput input = new RecordingInput(path.toFile())) {
- chunkParser = new ChunkParser(input, this.reuse);
+ chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !chunkParser.isChunkFinished()) {
- chunkParser.setReuse(this.reuse);
- chunkParser.setOrdered(this.ordered);
- chunkParser.setFirstNanos(startNanos);
+ final StreamConfiguration c2 = configuration;
+ boolean ordered = c2.getOrdered();
+ chunkParser.setReuse(c2.getReuse());
+ chunkParser.setOrdered(ordered);
+ chunkParser.setFirstNanos(c2.getStartNanos());
chunkParser.resetEventCache();
- chunkParser.setParserFilter(this.eventFilter);
+ chunkParser.setParserFilter(c2.getFilter());
chunkParser.updateEventParsers();
+ clearLastDispatch();
if (ordered) {
awaitnewEvent = processOrdered(awaitnewEvent);
} else {
@@ -101,6 +110,7 @@
}
}
+
private boolean processOrdered(boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
@@ -139,39 +149,36 @@
while (true) {
RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
- awaitNewEvents = true;
- break;
+ return true;
} else {
dispatch(e);
}
}
- return awaitNewEvents;
}
@Override
public void close() {
+ setClosed(true);
repositoryFiles.close();
}
}
- private final EventConsumer eventConsumer;
+ private final AbstractEventStream eventStream;
public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException {
- eventConsumer = new DirectoryConsumer(acc, p);
- eventConsumer.startTime = startTime;
+ eventStream = new DirectoryStream(acc, p);
+ eventStream.setStartTime(startTime);
}
+ @Override
public void close() {
- eventConsumer.close();
+ eventStream.close();
}
+ @Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.onFlush(action);
- }
-
- void start(long startNanos) {
- eventConsumer.start(startNanos);
+ eventStream.onFlush(action);
}
@Override
@@ -184,58 +191,62 @@
startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
}
- void startAsync(long startNanos) {
- eventConsumer.startAsync(startNanos);
- }
-
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- eventConsumer.onEvent(action);
+ eventStream.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- eventConsumer.onEvent(eventName, action);
+ eventStream.onEvent(eventName, action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.addCloseAction(action);
+ eventStream.addCloseAction(action);
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- return eventConsumer.remove(action);
+ return eventStream.remove(action);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- eventConsumer.awaitTermination(timeout);
+ eventStream.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- eventConsumer.awaitTermination(Duration.ofMillis(0));
+ eventStream.awaitTermination(Duration.ofMillis(0));
}
@Override
public void setReuse(boolean reuse) {
- eventConsumer.setReuse(reuse);
+ eventStream.setReuse(reuse);
}
@Override
public void setOrdered(boolean ordered) {
- eventConsumer.setOrdered(ordered);
+ eventStream.setOrdered(ordered);
}
@Override
public void setStartTime(Instant startTime) {
- eventConsumer.setStartTime(startTime);
+ eventStream.setStartTime(startTime);
+ }
+
+ public void start(long startNanos) {
+ eventStream.start(startNanos);
+ }
+
+ public void startAsync(long startNanos) {
+ eventStream.startAsync(startNanos);
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -43,39 +43,41 @@
*/
final class EventFileStream implements EventStream {
- private final static class FileConsumer extends EventConsumer {
+ private final static class FileStream extends AbstractEventStream {
private static final int DEFAULT_ARRAY_SIZE = 100_000;
+
private final RecordingInput input;
+
private ChunkParser chunkParser;
- private boolean reuse = true;
private RecordedEvent[] sortedList;
- private boolean ordered;
- public FileConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+ public FileStream(AccessControlContext acc, Path path) throws IOException {
super(acc);
- this.input = input;
- }
+ this.input = new RecordingInput(path.toFile());
+; }
@Override
public void process() throws IOException {
- chunkParser = new ChunkParser(input, reuse);
+ StreamConfiguration c1 = configuration;
+ chunkParser = new ChunkParser(input, c1.getReuse());
while (!isClosed()) {
- boolean reuse = this.reuse;
- boolean ordered = this.ordered;
- chunkParser.setReuse(reuse);
+ StreamConfiguration c2 = configuration;
+ boolean ordered = c2.getOrdered();
+ chunkParser.setReuse(c2.getReuse());
chunkParser.setOrdered(ordered);
chunkParser.resetEventCache();
- chunkParser.setParserFilter(eventFilter);
+ chunkParser.setParserFilter(c2.getFiler());
chunkParser.updateEventParsers();
+ clearLastDispatch();
if (ordered) {
processOrdered();
} else {
processUnordered();
}
+ runFlushActions();
if (chunkParser.isLastChunk()) {
return;
}
- runFlushActions();
chunkParser = chunkParser.nextChunkParser();
}
}
@@ -115,106 +117,94 @@
}
}
- public void setReuse(boolean reuse) {
- this.reuse = reuse;
- }
-
- public void setOrdered(boolean ordered) {
- this.ordered = ordered;
- }
-
@Override
public void close() {
-
+ setClosed(true);;
+ runCloseActions();
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
-
-
}
- private final RecordingInput input;
- private final FileConsumer eventConsumer;
+ private final FileStream eventStream;
public EventFileStream(Path path, Instant from, Instant to) throws IOException {
Objects.requireNonNull(path);
- input = new RecordingInput(path.toFile());
- eventConsumer = new FileConsumer(AccessController.getContext(), input);
+ eventStream = new FileStream(AccessController.getContext(), path);
}
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- eventConsumer.onEvent(action);
+ eventStream.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- eventConsumer.onEvent(eventName, action);
+ eventStream.onEvent(eventName, action);
}
@Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.onFlush(action);
+ eventStream.onFlush(action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- eventConsumer.addCloseAction(action);
+ eventStream.addCloseAction(action);
}
@Override
public void close() {
- eventConsumer.setClosed(true);
- eventConsumer.runCloseActions();
- try {
- input.close();
- } catch (IOException e) {
- // ignore
- }
+ eventStream.close();
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- return eventConsumer.remove(action);
+ return eventStream.remove(action);
}
@Override
public void start() {
- eventConsumer.start(0);
+ eventStream.start(0);
}
@Override
public void setReuse(boolean reuse) {
- eventConsumer.setReuse(reuse);
+ eventStream.setReuse(reuse);
}
@Override
public void startAsync() {
- eventConsumer.startAsync(0);
+ eventStream.startAsync(0);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- eventConsumer.awaitTermination(timeout);
+ eventStream.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- eventConsumer.awaitTermination();
+ eventStream.awaitTermination();
}
@Override
public void setOrdered(boolean ordered) {
- eventConsumer.setOrdered(ordered);
+ eventStream.setOrdered(ordered);
}
@Override
public void setStartTime(Instant startTime) {
- eventConsumer.setStartTime(startTime);
+ eventStream.setStartTime(startTime);
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Wed Jul 31 14:07:44 2019 +0200
@@ -50,7 +50,7 @@
* during reading
*/
public static EventStream openRepository(Path directory) throws IOException {
- return new EventDirectoryStream(AccessController.getContext(), directory, EventConsumer.NEXT_EVENT);
+ return new EventDirectoryStream(AccessController.getContext(), directory, AbstractEventStream.NEXT_EVENT);
}
/**
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Wed Jul 31 14:07:44 2019 +0200
@@ -360,7 +360,8 @@
}
public static boolean exists(SafePath safePath) throws IOException {
- return doPrivilegedIOWithReturn(() -> Files.exists(safePath.toPath()));
+ // Files.exist(path) is allocation intensive
+ return doPrivilegedIOWithReturn(() -> safePath.toPath().toFile().exists());
}
public static boolean isDirectory(SafePath safePath) throws IOException {
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Fri Jul 12 15:04:28 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Wed Jul 31 14:07:44 2019 +0200
@@ -66,7 +66,7 @@
public Path nextPath(long startTimeNanos) {
while (!closed) {
if (startTimeNanos == -1) {
- Entry<Long, Path> e = pathSet.lastEntry();
+ Entry<Long, Path> e = pathSet.lastEntry();
if (e != null) {
return e.getValue();
}
@@ -106,17 +106,16 @@
boolean foundNew = false;
List<Path> added = new ArrayList<>();
Set<Path> current = new HashSet<>();
- if (!Files.exists(repo)) {
- // Repository removed, probably due to shutdown
- return true;
- }
- try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo, "*.jfr")) {
+ try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(repo)) {
for (Path p : dirStream) {
if (!pathLookup.containsKey(p)) {
- added.add(p);
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+ String s = p.toString();
+ if (s.endsWith(".jfr")) {
+ added.add(p);
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath());
+ }
+ current.add(p);
}
- current.add(p);
}
}
List<Path> removed = new ArrayList<>();
@@ -160,5 +159,4 @@
}
}
-
}
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java Wed Jul 31 14:07:44 2019 +0200
@@ -0,0 +1,2 @@
+package jdk.jfr.internal.consumer;
+
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnFlush.java Fri Jul 12 15:04:28 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnFlush.java Wed Jul 31 14:07:44 2019 +0200
@@ -43,12 +43,12 @@
}
public static void main(String... args) throws Exception {
- testOnFLushNull();
+ testOnFlushNull();
testOneEvent();
testNoEvent();
}
- private static void testOnFLushNull() {
+ private static void testOnFlushNull() {
try (RecordingStream rs = new RecordingStream()) {
try {
rs.onFlush(null);