--- a/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp Thu Sep 12 20:46:55 2019 -0700
+++ b/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp Fri Sep 13 18:46:07 2019 +0200
@@ -77,7 +77,7 @@
HRESULT res = D2D1CreateFactory(D2D1_FACTORY_TYPE_SINGLE_THREADED,
&m_pDirect2dFactory);
if (res == S_OK) {
- m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY);
+ // m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY);
m_pDirect2dFactory->Release();
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -26,20 +26,16 @@
package jdk.jfr.consumer;
import java.io.IOException;
-import java.lang.invoke.VarHandle;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import jdk.jfr.EventType;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
@@ -59,40 +55,19 @@
*/
abstract class AbstractEventStream implements EventStream {
- final static class EventDispatcher {
- final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
- final String eventName;
- final Consumer<RecordedEvent> action;
-
- public EventDispatcher(Consumer<RecordedEvent> action) {
- this(null, action);
- }
+ static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
- public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
- this.eventName = eventName;
- this.action = action;
- }
-
- public void offer(RecordedEvent event) {
- action.accept(event);
- }
-
- public boolean accepts(EventType eventType) {
- return (eventName == null || eventType.getName().equals(eventName));
- }
- }
-
- final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
private final static AtomicLong counter = new AtomicLong(1);
- private volatile Thread thread;
private final Object terminated = new Object();
private final boolean active;
- private final Runnable flushOperation = () -> runFlushActions();
+ private final Runnable flushOperation = () -> dispatcher().runFlushActions();
private final AccessControlContext accessControllerContext;
- private final Object configurationLock = new Object();
+ private final StreamConfiguration configuration = new StreamConfiguration();
- // Modified by updateConfiguration()
- protected volatile StreamConfiguration configuration = new StreamConfiguration();
+ private volatile Thread thread;
+ private Dispatcher dispatcher;
+
+ private volatile boolean closed;
public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
@@ -108,99 +83,85 @@
@Override
abstract public void close();
- // Purpose of synchronizing the following methods is
- // to serialize changes to the configuration so only one
- // thread at a time can change the configuration.
- //
- // The purpose is not to guard the configuration field. A new
- // configuration is published using updateConfiguration
- //
+ protected final Dispatcher dispatcher() {
+ if (configuration.hasChanged()) {
+ synchronized (configuration) {
+ dispatcher = new Dispatcher(configuration);
+ }
+ }
+ return dispatcher;
+ }
+
@Override
public final void setOrdered(boolean ordered) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
- }
+ configuration.setOrdered(ordered);
}
@Override
public final void setReuse(boolean reuse) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
- }
+ configuration.setReuse(reuse);
}
@Override
public final void setStartTime(Instant startTime) {
Objects.nonNull(startTime);
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Stream is already started");
}
if (startTime.isBefore(Instant.EPOCH)) {
startTime = Instant.EPOCH;
}
- updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+ configuration.setStartTime(startTime);
}
}
@Override
public final void setEndTime(Instant endTime) {
Objects.requireNonNull(endTime);
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Stream is already started");
}
- updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+ configuration.setEndTime(endTime);
}
}
@Override
public final void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- add(new EventDispatcher(action));
- }
+ configuration.addEventAction(action);
}
@Override
public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- add(new EventDispatcher(eventName, action));
- }
+ configuration.addEventAction(eventName, action);
}
@Override
public final void onFlush(Runnable action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
- }
+ configuration.addFlushAction(action);
}
@Override
public final void onClose(Runnable action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
- }
+ configuration.addCloseAction(action);
}
@Override
public final void onError(Consumer<Throwable> action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
- }
+ configuration.addErrorAction(action);
}
@Override
public final boolean remove(Object action) {
Objects.requireNonNull(action);
- synchronized (configurationLock) {
- return updateConfiguration(new StreamConfiguration(configuration).remove(action));
- }
+ return configuration.remove(action);
}
@Override
@@ -247,53 +208,12 @@
protected abstract void process() throws Exception;
- protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
- EventType type = event.getEventType();
- EventDispatcher[] dispatchers = null;
- if (type == c.cacheEventType) {
- dispatchers = c.cacheDispatchers;
- } else {
- dispatchers = c.dispatcherLookup.get(type.getId());
- if (dispatchers == null) {
- List<EventDispatcher> list = new ArrayList<>();
- for (EventDispatcher e : c.getDispatchers()) {
- if (e.accepts(type)) {
- list.add(e);
- }
- }
- dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
- c.dispatcherLookup.put(type.getId(), dispatchers);
- }
- c.cacheDispatchers = dispatchers;
- }
- for (int i = 0; i < dispatchers.length; i++) {
- try {
- dispatchers[i].offer(event);
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- protected final void runCloseActions() {
- Runnable[] closeActions = configuration.getCloseActions();
- for (int i = 0; i < closeActions.length; i++) {
- try {
- closeActions[i].run();
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
protected final void setClosed(boolean closed) {
- synchronized (configurationLock) {
- updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
- }
+ this.closed = closed;
}
protected final boolean isClosed() {
- return configuration.isClosed();
+ return closed;
}
protected final void startAsync(long startNanos) {
@@ -313,34 +233,15 @@
return flushOperation;
}
- private void add(EventDispatcher e) {
- updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
- }
-
- private boolean updateConfiguration(StreamConfiguration newConfiguration) {
- if (!Thread.holdsLock(configurationLock)) {
- throw new InternalError("Modification of configuration without proper lock");
- }
- if (newConfiguration.hasChanged()) {
- // Publish objects held by configuration object
- VarHandle.releaseFence();
- configuration = newConfiguration;
- return true;
- }
- return false;
- }
-
private void startInternal(long startNanos) {
- synchronized (configurationLock) {
- if (configuration.isStarted()) {
+ synchronized (configuration) {
+ if (configuration.started) {
throw new IllegalStateException("Event stream can only be started once");
}
- StreamConfiguration c = new StreamConfiguration(configuration);
if (active) {
- c.setStartNanos(startNanos);
+ configuration.setStartNanos(startNanos);
}
- c.setStarted(true);
- updateConfiguration(c);
+ configuration.setStarted(true);
}
}
@@ -348,8 +249,14 @@
JVM.getJVM().exclude(Thread.currentThread());
try {
process();
+ } catch (IOException ioe) {
+ // This can happen if a chunk file is removed, or
+ // a file is access that has been closed
+ // This is "normal" behavior for streaming and the
+ // stream will be closed when this happens
} catch (Exception e) {
- defaultErrorHandler(e);
+ // TODO: Remove before integrating
+ e.printStackTrace();
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
try {
@@ -362,30 +269,6 @@
}
}
- private void handleError(Throwable e) {
- Consumer<?>[] consumers = configuration.errorActions;
- if (consumers.length == 0) {
- defaultErrorHandler(e);
- return;
- }
- for (int i = 0; i < consumers.length; i++) {
- @SuppressWarnings("unchecked")
- Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
- conusmer.accept(e);
- }
- }
-
- private void runFlushActions() {
- Runnable[] flushActions = configuration.getFlushActions();
- for (int i = 0; i < flushActions.length; i++) {
- try {
- flushActions[i].run();
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
private void run(AccessControlContext accessControlContext) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
@@ -400,8 +283,4 @@
counter.incrementAndGet();
return "JFR Event Stream " + counter;
}
-
- private void defaultErrorHandler(Throwable e) {
- e.printStackTrace();
- }
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Sep 13 18:46:07 2019 +0200
@@ -48,6 +48,28 @@
*
*/
final class ChunkParser {
+
+ static final class ParserConfiguration {
+ final boolean reuse;
+ final boolean ordered;
+ final InternalEventFilter eventFilter;
+
+ long filterStart;
+ long filterEnd;
+
+ public ParserConfiguration(long filterStart, long filterEnd, boolean reuse, boolean ordered, InternalEventFilter filter) {
+ this.filterStart = filterStart;
+ this.filterEnd = filterEnd;
+ this.reuse = reuse;
+ this.ordered = ordered;
+ this.eventFilter = filter;
+ }
+
+ public ParserConfiguration() {
+ this(0, Long.MAX_VALUE, false, false, InternalEventFilter.ACCEPT_ALL);
+ }
+ }
+
// Checkpoint that finishes a flush segment
static final byte CHECKPOINT_FLUSH_MASK = 1;
// Checkpoint contains chunk header information in the first pool
@@ -70,24 +92,24 @@
private LongMap<Type> typeMap;
private LongMap<Parser> parsers;
private boolean chunkFinished;
- private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
- private boolean reuse;
- private boolean ordered;
- private boolean resetEventCache;
- private long filterStart = 0;
- private long filterEnd = Long.MAX_VALUE;
+
private Runnable flushOperation;
+ private ParserConfiguration configuration;
- public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
- this(new ChunkHeader(input), null, 1000);
- this.reuse = reuse;
+ public ChunkParser(RecordingInput input) throws IOException {
+ this(input, new ParserConfiguration());
+ }
+
+ public ChunkParser(RecordingInput input, ParserConfiguration pc) throws IOException {
+ this(new ChunkHeader(input), null, pc);
}
public ChunkParser(ChunkParser previous) throws IOException {
- this(new ChunkHeader(previous.input), previous, 1000);
+ this(new ChunkHeader(previous.input), previous, new ParserConfiguration());
}
- private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
+ private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc) throws IOException {
+ this.configuration = pc;
this.input = header.getInput();
this.chunkHeader = header;
if (previous == null) {
@@ -98,9 +120,7 @@
this.constantLookups = previous.constantLookups;
this.previousMetadata = previous.metadata;
this.pollInterval = previous.pollInterval;
- this.ordered = previous.ordered;
- this.reuse = previous.reuse;
- this.eventFilter = previous.eventFilter;
+ this.configuration = previous.configuration;
}
this.metadata = header.readMetadata(previousMetadata);
this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
@@ -108,7 +128,7 @@
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
- updateEventParsers();
+ updateConfiguration();
} else {
parsers = previous.parsers;
typeMap = previous.typeMap;
@@ -122,12 +142,37 @@
input.position(chunkHeader.getEventStart());
}
- public void setParserFilter(InternalEventFilter filter) {
- this.eventFilter = filter;
+ public ChunkParser nextChunkParser() throws IOException {
+ return new ChunkParser(chunkHeader.nextHeader(), this, configuration);
+ }
+
+ private void updateConfiguration() {
+ updateConfiguration(configuration, false);
}
- public InternalEventFilter getEventFilter() {
- return this.eventFilter;
+ public void updateConfiguration(ParserConfiguration configuration, boolean resetEventCache) {
+ this.configuration = configuration;
+ parsers.forEach(p -> {
+ if (p instanceof EventParser) {
+ EventParser ep = (EventParser) p;
+ if (resetEventCache) {
+ ep.resetCache();
+ }
+ String name = ep.getEventType().getName();
+ ep.setOrdered(configuration.ordered);
+ ep.setReuse(configuration.reuse);
+ ep.setFilterStart(configuration.filterStart);
+ ep.setFilterEnd(configuration.filterEnd);
+ long threshold = configuration.eventFilter.getThreshold(name);
+ if (threshold >= 0) {
+ ep.setEnabled(true);
+ ep.setThresholdNanos(threshold);
+ } else {
+ ep.setEnabled(false);
+ ep.setThresholdNanos(Long.MAX_VALUE);
+ }
+ }
+ });
}
/**
@@ -161,7 +206,7 @@
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
- updateEventParsers();
+ updateConfiguration();;
}
if (contantPosition != chunkHeader.getConstantPoolPosition()) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
@@ -372,76 +417,14 @@
return new ChunkParser(this);
}
- public ChunkParser nextChunkParser() throws IOException {
- return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval);
- }
-
public boolean isChunkFinished() {
return chunkFinished;
}
- // Need to call updateEventParsers() for
- // change to take effect
- public void setReuse(boolean resue) {
- this.reuse = resue;
- }
-
public void setFlushOperation(Runnable flushOperation) {
this.flushOperation = flushOperation;
}
- // Need to call updateEventParsers() for
- // change to take effect
- public void setOrdered(boolean ordered) {
- this.ordered = ordered;
- }
-
- // Need to call updateEventParsers() for
- // change to take effect
- public void setFilterStart(long filterStart) {
- long chunkStart = chunkHeader.getStartNanos();
- // Optimization.
- if (filterStart < chunkStart - 1_000_000_000L) {
- filterStart = 0;
- }
- this.filterStart = filterStart;
- }
-
- public void setFilterEnd(long filterEnd) {
- this.filterEnd = filterEnd;
- }
-
- // Need to call updateEventParsers() for
- // change to take effect
- public void resetEventCache() {
- this.resetEventCache = true;
- }
-
- public void updateEventParsers() {
- parsers.forEach(p -> {
- if (p instanceof EventParser) {
- EventParser ep = (EventParser) p;
- String name = ep.getEventType().getName();
- ep.setOrdered(ordered);
- ep.setReuse(reuse);
- ep.setFilterStart(filterStart);
- ep.setFilterEnd(filterEnd);
- if (resetEventCache) {
- ep.resetCache();
- }
- long threshold = eventFilter.getThreshold(name);
- if (threshold >= 0) {
- ep.setEnabled(true);
- ep.setThresholdNanos(threshold);
- } else {
- ep.setEnabled(false);
- ep.setThresholdNanos(Long.MAX_VALUE);
- }
- }
- });
- resetEventCache = false;
- }
-
public long getChunkDuration() {
return chunkHeader.getDurationNanos();
}
@@ -449,4 +432,5 @@
public long getStartNanos() {
return chunkHeader.getStartNanos();
}
+
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java Fri Sep 13 18:46:07 2019 +0200
@@ -0,0 +1,144 @@
+package jdk.jfr.consumer;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
+import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.consumer.InternalEventFilter;
+
+public final class Dispatcher {
+
+ public final static class EventDispatcher {
+ final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
+ final String eventName;
+ final Consumer<RecordedEvent> action;
+
+ public EventDispatcher(Consumer<RecordedEvent> action) {
+ this(null, action);
+ }
+
+ public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
+ this.eventName = eventName;
+ this.action = action;
+ }
+
+ public void offer(RecordedEvent event) {
+ action.accept(event);
+ }
+
+ public boolean accepts(EventType eventType) {
+ return (eventName == null || eventType.getName().equals(eventName));
+ }
+ }
+
+ final Consumer<Throwable>[] errorActions;
+ final Runnable[] flushActions;
+ final Runnable[] closeActions;
+ final EventDispatcher[] dispatchers;
+ final LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+ final ParserConfiguration parserConfiguration;
+ final Instant startTime;
+ final Instant endTime;
+ final long startNanos;
+ final long endNanos;
+
+ // Cache
+ private EventType cacheEventType;
+ private EventDispatcher[] cacheDispatchers;
+
+ @SuppressWarnings({"unchecked","rawtypes"})
+ public Dispatcher(StreamConfiguration c) {
+ this.flushActions = c.flushActions.toArray(new Runnable[0]);
+ this.closeActions = c.closeActions.toArray(new Runnable[0]);
+ this.errorActions = c.errorActions.toArray(new Consumer[0]);
+ this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
+ this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
+ this.startTime = c.startTime;
+ this.endTime = c.endTime;
+ this.startNanos = c.startNanos;
+ this.endNanos = c.endNanos;
+ }
+
+ private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+ InternalEventFilter ef = new InternalEventFilter();
+ for (EventDispatcher ed : dispatchers) {
+ String name = ed.eventName;
+ if (name == null) {
+ return InternalEventFilter.ACCEPT_ALL;
+ }
+ ef.setThreshold(name, 0);
+ }
+ return ef;
+ }
+
+ protected final void dispatch(RecordedEvent event) {
+ EventType type = event.getEventType();
+ EventDispatcher[] dispatchers = null;
+ if (type == cacheEventType) {
+ dispatchers = cacheDispatchers;
+ } else {
+ dispatchers = dispatcherLookup.get(type.getId());
+ if (dispatchers == null) {
+ List<EventDispatcher> list = new ArrayList<>();
+ for (EventDispatcher e : this.dispatchers) {
+ if (e.accepts(type)) {
+ list.add(e);
+ }
+ }
+ dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+ dispatcherLookup.put(type.getId(), dispatchers);
+ }
+ cacheDispatchers = dispatchers;
+ }
+ for (int i = 0; i < dispatchers.length; i++) {
+ try {
+ dispatchers[i].offer(event);
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ public void handleError(Throwable e) {
+ Consumer<?>[] consumers = this.errorActions;
+ if (consumers.length == 0) {
+ defaultErrorHandler(e);
+ return;
+ }
+ for (int i = 0; i < consumers.length; i++) {
+ @SuppressWarnings("unchecked")
+ Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+ conusmer.accept(e);
+ }
+ }
+
+ public void runFlushActions() {
+ Runnable[] flushActions = this.flushActions;
+ for (int i = 0; i < flushActions.length; i++) {
+ try {
+ flushActions[i].run();
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ public void runCloseActions() {
+ Runnable[] closeActions = this.closeActions;
+ for (int i = 0; i < closeActions.length; i++) {
+ try {
+ closeActions[i].run();
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ void defaultErrorHandler(Throwable e) {
+ e.printStackTrace();
+ }
+}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Objects;
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.RecordingInput;
@@ -46,6 +47,7 @@
private final RepositoryFiles repositoryFiles;
private final boolean active;
private final FileAccess fileAccess;
+
private ChunkParser chunkParser;
private long chunkStartNanos;
private RecordedEvent[] sortedList;
@@ -60,7 +62,7 @@
@Override
public void close() {
setClosed(true);
- runCloseActions();
+ dispatcher().runCloseActions();
repositoryFiles.close();
}
@@ -76,11 +78,12 @@
@Override
protected void process() throws Exception {
- StreamConfiguration c = configuration;
+ Dispatcher disp = dispatcher();
+
Path path;
- boolean validStartTime = active || c.getStartTime() != null;
+ boolean validStartTime = active || disp.startTime != null;
if (validStartTime) {
- path = repositoryFiles.firstPath(c.getStartNanos());
+ path = repositoryFiles.firstPath(disp.startNanos);
} else {
path = repositoryFiles.lastPath();
}
@@ -89,28 +92,24 @@
}
chunkStartNanos = repositoryFiles.getTimestamp(path);
try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
- chunkParser = new ChunkParser(input, c.getReuse());
+ chunkParser = new ChunkParser(input, disp.parserConfiguration);
long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
- long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
- long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+ long filterStart = validStartTime ? disp.startNanos : segmentStart;
+ long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
+
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !chunkParser.isChunkFinished()) {
- c = configuration;
- boolean ordered = c.getOrdered();
+ disp = dispatcher();
+ ParserConfiguration pc = disp.parserConfiguration;
+ pc.filterStart = filterStart;
+ pc.filterEnd = filterEnd;
+ chunkParser.updateConfiguration(pc, true);
chunkParser.setFlushOperation(getFlushOperation());
- chunkParser.setReuse(c.getReuse());
- chunkParser.setOrdered(ordered);
- chunkParser.setFilterStart(filtertStart);
- chunkParser.setFilterEnd(filterEnd);
- chunkParser.resetEventCache();
- chunkParser.setParserFilter(c.getFilter());
- chunkParser.updateEventParsers();
- c.clearDispatchCache();
- if (ordered) {
- awaitnewEvent = processOrdered(c, awaitnewEvent);
+ if (pc.ordered) {
+ awaitnewEvent = processOrdered(disp, awaitnewEvent);
} else {
- awaitnewEvent = processUnordered(c, awaitnewEvent);
+ awaitnewEvent = processUnordered(disp, awaitnewEvent);
}
if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
close();
@@ -135,7 +134,7 @@
}
}
- private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+ private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[100_000];
}
@@ -164,18 +163,18 @@
Arrays.sort(sortedList, 0, index, END_TIME);
}
for (int i = 0; i < index; i++) {
- dispatch(c, sortedList[i]);
+ c.dispatch(sortedList[i]);
}
return awaitNewEvents;
}
- private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+ private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
while (true) {
RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
return true;
} else {
- dispatch(c, e);
+ c.dispatch(e);
}
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -62,7 +62,7 @@
@Override
public void close() {
setClosed(true);
- runCloseActions();
+ dispatcher().runCloseActions();
try {
input.close();
} catch (IOException e) {
@@ -72,46 +72,40 @@
@Override
protected void process() throws IOException {
- StreamConfiguration c = configuration;
+ Dispatcher disp = dispatcher();
long start = 0;
long end = Long.MAX_VALUE;
- if (c.getStartTime() != null) {
- start = c.getStartNanos();
+ if (disp.startTime != null) {
+ start = disp.startNanos;
}
- if (c.getEndTime() != null) {
- end = c.getEndNanos();
+ if (disp.endTime != null) {
+ end = disp.endNanos;
}
- chunkParser = new ChunkParser(input, c.getReuse());
+ chunkParser = new ChunkParser(input, disp.parserConfiguration);
while (!isClosed()) {
if (chunkParser.getStartNanos() > end) {
close();
return;
}
- c = configuration;
- boolean ordered = c.getOrdered();
+ disp = dispatcher();
+ disp.parserConfiguration.filterStart = start;
+ disp.parserConfiguration.filterEnd = end;
+ chunkParser.updateConfiguration(disp.parserConfiguration, true);
chunkParser.setFlushOperation(getFlushOperation());
- chunkParser.setFilterStart(start);
- chunkParser.setFilterEnd(end);
- chunkParser.setReuse(c.getReuse());
- chunkParser.setOrdered(ordered);
- chunkParser.resetEventCache();
- chunkParser.setParserFilter(c.getFiler());
- chunkParser.updateEventParsers();
- c.clearDispatchCache();
- if (ordered) {
- processOrdered(c);
+ if (disp.parserConfiguration.ordered) {
+ processOrdered(disp);
} else {
- processUnordered(c);
+ processUnordered(disp);
}
- if (chunkParser.isLastChunk()) {
+ if (isClosed() || chunkParser.isLastChunk()) {
return;
}
chunkParser = chunkParser.nextChunkParser();
}
}
- private void processOrdered(StreamConfiguration c) throws IOException {
+ private void processOrdered(Dispatcher c) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[10_000];
}
@@ -122,7 +116,7 @@
if (event == null) {
Arrays.sort(sortedList, 0, index, END_TIME);
for (int i = 0; i < index; i++) {
- dispatch(c, sortedList[i]);
+ c.dispatch(sortedList[i]);
}
return;
}
@@ -135,13 +129,13 @@
}
}
- private void processUnordered(StreamConfiguration c) throws IOException {
+ private void processUnordered(Dispatcher c) throws IOException {
while (!isClosed()) {
RecordedEvent event = chunkParser.readEvent();
if (event == null) {
return;
}
- dispatch(c, event);
+ c.dispatch(event);
}
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -40,12 +40,60 @@
/**
* Represents a stream of events.
+ * <p>
+ * A stream is a sequence of events and the way to interact with a stream is to
+ * register actions.
+ * <p>
+ * To receive a notification when an event arrives, register an action using the
+ * {@link #onEvent(Consumer)} method. To filter the stream for an event with a
+ * specific name, use {@link #onEvent(String, Consumer)} method.
+ *
+ * By default, the same {@code RecordedEvent} object can be used for
+ * representing two or more distinct events. The object can be delivered
+ * multiple times to the same action as well as to other actions. If the life
+ * cycle of the event object is needed outside the scope of an action, the
+ * {@link #setReuse(boolean)} method should be set to {@code false} so that a
+ * new object is allocated for each event.
+ *
+ * <p>
+ * Events are delivered in batches. To receive a notification when a batch is
+ * complete, register an action using the {@link #onFlush(Runnable)} method.
+ * This is an opportunity to aggregate or push data to external systems while
+ * the Java Virtual Machine (JVM) is preparing the next batch.
+ * <p>
+ * Events within a batch are sorted chronologically by their end time. If
+ * ordering is not a concern, sorting can be disabled using the
+ * {@link #setOrdered(boolean)} method.
+ * <p>
+ * To dispatch events to registered actions, the stream must be started. To
+ * start processing in the current thread, invoke the {@link #start()} method.
+ * To process actions asynchronously in a separate thread, invoke the
+ * {@link #startAsync()} method. To await completion of the stream, use the
+ * awaitTermination {@link #awaitTermination()} or the {link
+ * {@link #awaitTermination(Duration)} method.
+ * <p>
+ * When a stream ends it is automatically closed. To manually stop processing of
+ * events, close the stream with the {@link #close()} method. A stream can also
+ * be automatically closed in exceptional circumstances, for instance if the JVM
+ * exits. To receive a notification in any of these occasions, use the
+ * {@link #onClose(Runnable)} method to register an action.
+ * <p>
+ * If an unexpected exception occurs in an action, it is possible to catch the
+ * exception in an error handler. An error handler can be registered using the
+ * {@link #onError(Runnable)} method. If no error handler is registered, the
+ * default behavior is to print the exception and its backtrace to the standard
+ * error stream.
+ * <p>
+ * The following example demonstrates how an {@code EventStream} can be used to
+ * listen to garbage collection and CPU Load events
+ * <p>
+ *
*/
public interface EventStream extends AutoCloseable {
/**
- * Creates a stream from the disk repository of the current Java Virtual
- * Machine (JVM).
+ * Creates a stream from the repository of the current Java Virtual Machine
+ * (JVM).
* <p>
* By default, the stream starts with the next event flushed by Flight
* Recorder.
@@ -96,7 +144,7 @@
*
* @return an event stream, not {@code null}
*
- * @throws IOException if a stream can't be opened, or an I/O error occurs
+ * @throws IOException if the file can't be opened, or an I/O error occurs
* during reading
*
* @throws SecurityException if a security manager exists and its
@@ -107,71 +155,75 @@
}
/**
- * Performs an action on all events in the stream.
+ * Registers an action to perform on all events in the stream.
*
- * @param action an action to be performed on each {@code RecordedEvent},
- * not {@code null}
+ * @param action an action to perform on each {@code RecordedEvent}, not
+ * {@code null}
*/
void onEvent(Consumer<RecordedEvent> action);
/**
- * Performs an action on all events in the stream with a specified name.
+ * Registers an action to perform on all events matching a name.
*
* @param eventName the name of the event, not {@code null}
*
- * @param action an action to be performed on each {@code RecordedEvent}
- * that matches the event name, not {@code null}
+ * @param action an action to perform on each {@code RecordedEvent} matching
+ * the event name, not {@code null}
*/
void onEvent(String eventName, Consumer<RecordedEvent> action);
/**
- * Performs an action when the event stream has been flushed.
+ * Registers an action to perform after the stream has been flushed.
*
- * @param action an action to be performed after stream has been flushed,
- * not {@code null}
+ * @param action an action to perform after the stream has been
+ * flushed, not {@code null}
*/
void onFlush(Runnable action);
/**
- * Performs an action if an exception occurs when processing the stream.
+ * Registers an action to perform if an exception occurs.
* <p>
- * if an error handler has not been added to the stream, an exception stack
- * trace is printed to standard error.
+ * if an action is not registered, an exception stack trace is printed to
+ * standard error.
* <p>
- * Adding an error handler overrides the default behavior. If multiple error
- * handlers have been added, they will be executed in the order they were
- * added.
+ * Registering an action overrides the default behavior. If multiple actions
+ * have been registered, they are performed in the order of registration.
+ * <p>
+ * If this method itself throws an exception, resulting behavior is
+ * undefined.
*
- * @param action an action to be performed if an exception occurs, not
+ * @param action an action to perform if an exception occurs, not
* {@code null}
*/
void onError(Consumer<Throwable> action);
/**
- * Performs an action when the event stream is closed.
+ * Registers an action to perform when the stream is closed.
* <p>
- * If the stream is already closed, the action will be executed immediately
+ * If the stream is already closed, the action will be performed immediately
* in the current thread.
*
- * @param action an action to be performed after the stream has been closed,
- * not {@code null}
+ * @param action an action to perform after the stream is closed, not
+ * {@code null}
+ * @see #close()
*/
void onClose(Runnable action);
/**
- * Releases all resources associated with this event stream.
+ * Releases all resources associated with this stream.
*/
void close();
/**
- * Removes an action from the stream.
+ * Unregisters an action.
* <p>
- * If the action has been added multiple times, all instance of it will be
- * removed.
+ * If the action has been registered multiple times, all instances are
+ * unregistered.
*
- * @param action the action to remove, not {@code null}
+ * @param action the action to unregister, not {@code null}
*
- * @return {@code true} if the action was removed, {@code false} otherwise
+ * @return {@code true} if the action was unregistered, {@code false}
+ * otherwise
*
* @see #onEvent(Consumer)
* @see #onEvent(String, Consumer)
@@ -183,91 +235,98 @@
/**
* Specifies that the event object in an {@link #onEvent(Consumer)} action
- * is to be reused.
+ * can be reused.
* <p>
- * If reuse is set to {@code true), a callback should not keep a reference
- * to the event object after the callback from {@code onEvent} has returned.
+ * If reuse is set to {@code true), an action should not keep a reference
+ * to the event object after the action has completed.
*
- * @param resuse if event objects can be reused between calls to
- * {@code #onEvent(Consumer)}
- *
+ * @param reuse {@code true} if an event object can be reused, {@code false}
+ * otherwise
*/
- public void setReuse(boolean reuse);
+ void setReuse(boolean reuse);
/**
* Specifies that events arrives in chronological order, sorted by the time
- * they were committed to the event stream.
+ * they were committed to the stream.
*
* @param ordered if event objects arrive in chronological order to
* {@code #onEvent(Consumer)}
*/
- public void setOrdered(boolean ordered);
+ void setOrdered(boolean ordered);
/**
- * Specifies start time of the event stream.
+ * Specifies the start time of the stream.
* <p>
- * The start time must be set before the stream is started.
+ * The start time must be set before starting the stream
*
* @param startTime the start time, not {@code null}
*
- * @throws IllegalStateException if the stream has already been started
+ * @throws IllegalStateException if the stream is already started
+ *
+ * @see #start()
+ * @see #startAsync()
*/
- public void setStartTime(Instant startTime);
+ void setStartTime(Instant startTime);
/**
- * Specifies end time of the event stream.
+ * Specifies the end time of the stream.
* <p>
- * The end time must be set before the stream is started.
+ * The end time must be set before starting the stream.
* <p>
- * When the end time is reached the stream is closed.
+ * At end time, the stream is closed.
*
* @param endTime the end time, not {@code null}
*
- * @throws IllegalStateException if the stream has already been started
+ * @throws IllegalStateException if the stream is already started
+ *
+ * @see #start()
+ * @see #startAsync()
*/
- public void setEndTime(Instant endTime);
+ void setEndTime(Instant endTime);
/**
- * Start processing events in the stream.
+ * Start processing of actions.
* <p>
- * All actions performed on this stream will happen in the current thread.
+ * Actions are performed in the current thread.
*
- * @throws IllegalStateException if the stream is already started or if it
- * has been closed
+ * @throws IllegalStateException if the stream is already started or closed
*/
void start();
/**
- * Start processing events in the stream asynchronously.
+ * Start asynchronous processing of actions.
* <p>
- * All actions on this stream will be performed in a separate thread.
+ * Actions are performed in a single separate thread.
*
- * @throws IllegalStateException if the stream is already started, or if it
- * has been closed
+ * @throws IllegalStateException if the stream is already started or closed
*/
void startAsync();
/**
- * Blocks the current thread until the stream is finished, closed, or it
- * times out.
+ * Blocks until all actions are completed, or the stream is closed, or the
+ * timeout occurs, or the current thread is interrupted, whichever happens
+ * first.
*
* @param timeout the maximum time to wait, not {@code null}
*
* @throws IllegalArgumentException if timeout is negative
- * @throws InterruptedException
+ * @throws InterruptedException if interrupted while waiting
*
* @see #start()
* @see #startAsync()
+ * @see Thread#interrupt()
*/
void awaitTermination(Duration timeout) throws InterruptedException;
/**
- * Blocks the current thread until the stream is finished or closed.
+ * Blocks until all actions are completed, or the stream is closed, or the
+ * current thread is interrupted, whichever happens first.
*
- * @throws InterruptedException
+ * @throws InterruptedException if interrupted while waiting
*
* @see #start()
* @see #startAsync()
+ * @see Thread#interrupt()
*/
void awaitTermination() throws InterruptedException;
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Fri Sep 13 18:46:07 2019 +0200
@@ -238,7 +238,7 @@
private void findNext() throws IOException {
while (nextEvent == null) {
if (chunkParser == null) {
- chunkParser = new ChunkParser(input, false);
+ chunkParser = new ChunkParser(input);
} else if (!chunkParser.isLastChunk()) {
chunkParser = chunkParser.nextChunkParser();
} else {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri Sep 13 18:46:07 2019 +0200
@@ -47,7 +47,7 @@
* A recording stream produces events from the current JVM (Java Virtual
* Machine).
* <p>
- * The following example, shows how to record events using the default
+ * The following example shows how to record events using the default
* configuration and print the Garbage Collection, CPU Load and JVM Information
* event to standard out.
*
@@ -110,7 +110,7 @@
* </code>
* </pre>
*
- * @param configuration configuration that contains the settings to be use,
+ * @param configuration configuration that contains the settings to use,
* not {@code null}
*
* @throws IllegalStateException if Flight Recorder can't be created (for
@@ -146,16 +146,17 @@
}
/**
- * Replaces all settings for this recording stream
+ * Replaces all settings for this recording stream.
* <p>
- * The following example records 20 second using the "default" configuration
- * and then changes to settings for the "profile" configuration.
+ * The following example records 20 seconds using the "default" configuration
+ * and then changes settings to the "profile" configuration.
*
* <pre>
* <code>
- * var defaultConfiguration = Configuration.getConfiguration("default");
- * var profileConfiguration = Configuration.getConfiguration("profile");
- * try (var rs = new RecordingStream(defaultConfiguration) {
+ * Configuration defaultConfiguration = Configuration.getConfiguration("default");
+ * Configuration profileConfiguration = Configuration.getConfiguration("profile");
+ * try (RecordingStream rs = new RecordingStream(defaultConfiguration) {
+ * rs.onEvent(System.out::println);
* rs.startAsync();
* Thread.sleep(20_000);
* rs.setSettings(profileConfiguration.getSettings());
@@ -165,6 +166,8 @@
* </pre>
*
* @param settings the settings to set, not {@code null}
+ *
+ * @see Recording#setSettings(Map)
*/
public void setSettings(Map<String, String> settings) {
recording.setSettings(settings);
@@ -217,8 +220,7 @@
}
/**
- * Determines how far back data is kept for the stream, if the stream can't
- * keep up.
+ * Determines how far back data is kept for the stream.
* <p>
* To control the amount of recording data stored on disk, the maximum
* length of time to retain the data can be specified. Data stored on disk
@@ -241,8 +243,7 @@
}
/**
- * Determines how much data is kept in the disk repository if the stream
- * can't keep up.
+ * Determines how much data is kept for the stream.
* <p>
* To control the amount of recording data that is stored on disk, the
* maximum amount of data to retain can be specified. When the maximum limit
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java Fri Sep 13 18:46:07 2019 +0200
@@ -2,267 +2,101 @@
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
-import jdk.jfr.EventType;
-import jdk.jfr.consumer.AbstractEventStream.EventDispatcher;
-import jdk.jfr.internal.LongMap;
+import jdk.jfr.consumer.Dispatcher.EventDispatcher;
import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
final class StreamConfiguration {
- private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
- Consumer<?>[] errorActions = new Consumer<?>[0];
- private Runnable[] flushActions = NO_ACTIONS;
- private Runnable[] closeActions = NO_ACTIONS;
- private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
- private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
- LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+ final List<Runnable> closeActions = new ArrayList<>();
+ final List<Runnable> flushActions = new ArrayList<>();
+ final List<EventDispatcher> eventActions = new ArrayList<>();
+ final List<Consumer<Throwable>> errorActions = new ArrayList<>();
- private boolean changedConfiguration = false;
- private boolean closed = false;
- private boolean reuse = true;
- private boolean ordered = true;
- private Instant startTime = null;
- private Instant endTime = null;
- private boolean started = false;
- private long startNanos = 0;
- private long endNanos = Long.MAX_VALUE;
+ boolean reuse = true;
+ boolean ordered = true;
+ Instant startTime = null;
+ Instant endTime = null;
+ boolean started = false;
+ long startNanos = 0;
+ long endNanos = Long.MAX_VALUE;
- // Cache the last event type and dispatch.
- EventType cacheEventType;
- EventDispatcher[] cacheDispatchers;
-
+ volatile boolean changed = true;
- public StreamConfiguration(StreamConfiguration configuration) {
- this.flushActions = configuration.flushActions;
- this.closeActions = configuration.closeActions;
- this.errorActions = configuration.errorActions;
- this.dispatchers = configuration.dispatchers;
- this.eventFilter = configuration.eventFilter;
- this.closed = configuration.closed;
- this.reuse = configuration.reuse;
- this.ordered = configuration.ordered;
- this.startTime = configuration.startTime;
- this.endTime = configuration.endTime;
- this.started = configuration.started;
- this.startNanos = configuration.startNanos;
- this.endNanos = configuration.endNanos;
- this.dispatcherLookup = configuration.dispatcherLookup;
+ public synchronized boolean remove(Object action) {
+ boolean removed = false;
+ removed |= flushActions.removeIf(e -> e == action);
+ removed |= closeActions.removeIf(e -> e == action);
+ removed |= errorActions.removeIf(e -> e == action);
+ removed |= eventActions.removeIf(e -> e.action == action);
+ if (removed) {
+ changed = true;
+ }
+ return removed;
}
- public StreamConfiguration() {
- }
-
- public StreamConfiguration remove(Object action) {
- flushActions = remove(flushActions, action);
- closeActions = remove(closeActions, action);
- errorActions = remove(errorActions, action);
- dispatchers = removeDispatch(dispatchers, action);
- return this;
- }
-
- public StreamConfiguration addDispatcher(EventDispatcher e) {
- dispatchers = add(dispatchers, e);
- eventFilter = buildFilter(dispatchers);
- dispatcherLookup = new LongMap<>();
- return this;
- }
-
- public StreamConfiguration addFlushAction(Runnable action) {
- flushActions = add(flushActions, action);
- return this;
+ public synchronized void addEventAction(String name, Consumer<RecordedEvent> consumer) {
+ eventActions.add(new EventDispatcher(name, consumer));
+ changed = true;
}
- public StreamConfiguration addCloseAction(Runnable action) {
- closeActions = add(closeActions, action);
- return this;
- }
-
- public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
- errorActions = add(errorActions, action);
- return this;
- }
-
- public StreamConfiguration setClosed(boolean closed) {
- this.closed = closed;
- changedConfiguration = true;
- return this;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public Runnable[] getCloseActions() {
- return closeActions;
+ public void addEventAction(Consumer<RecordedEvent> action) {
+ addEventAction(null, action);
}
- public Runnable[] getFlushActions() {
- return flushActions;
+ public synchronized void addFlushAction(Runnable action) {
+ flushActions.add(action);
+ changed = true;
}
- private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
- List<EventDispatcher> list = new ArrayList<>(array.length);
- boolean modified = false;
- for (int i = 0; i < array.length; i++) {
- if (array[i].action != action) {
- list.add(array[i]);
- } else {
- modified = true;
- }
- }
- EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
- if (modified) {
- eventFilter = buildFilter(result);
- dispatcherLookup = new LongMap<>();
- changedConfiguration = true;
- }
- return result;
+ public synchronized void addCloseAction(Runnable action) {
+ closeActions.add(action);
+ changed = true;
}
- private <T> T[] remove(T[] array, Object action) {
- List<T> list = new ArrayList<>(array.length);
- for (int i = 0; i < array.length; i++) {
- if (array[i] != action) {
- list.add(array[i]);
- } else {
- changedConfiguration = true;
- }
- }
- return list.toArray(array);
+ public synchronized void addErrorAction(Consumer<Throwable> action) {
+ errorActions.add(action);
+ changed = true;
}
- private <T> T[] add(T[] array, T object) {
- List<T> list = new ArrayList<>(Arrays.asList(array));
- list.add(object);
- changedConfiguration = true;
- return list.toArray(array);
+ public synchronized void setReuse(boolean reuse) {
+ this.reuse = reuse;
+ changed = true;
}
- private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
- InternalEventFilter ef = new InternalEventFilter();
- for (EventDispatcher ed : dispatchers) {
- String name = ed.eventName;
- if (name == null) {
- return InternalEventFilter.ACCEPT_ALL;
- }
- ef.setThreshold(name, 0);
- }
- return ef;
+ public synchronized void setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ changed = true;
}
- public StreamConfiguration setReuse(boolean reuse) {
- this.reuse = reuse;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setOrdered(boolean ordered) {
- this.ordered = ordered;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setEndTime(Instant endTime) {
+ public synchronized void setEndTime(Instant endTime) {
this.endTime = endTime;
this.endNanos = Utils.timeToNanos(endTime);
- changedConfiguration = true;
- return this;
+ changed = true;
}
- public StreamConfiguration setStartTime(Instant startTime) {
+ public synchronized void setStartTime(Instant startTime) {
this.startTime = startTime;
this.startNanos = Utils.timeToNanos(startTime);
- changedConfiguration = true;
- return this;
- }
-
- public Instant getStartTime() {
- return startTime;
+ changed = true;
}
- public Object getEndTime() {
- return endTime;
- }
-
- public boolean isStarted() {
- return started;
+ public synchronized void setStartNanos(long startNanos) {
+ this.startNanos = startNanos;
+ changed = true;
}
- public StreamConfiguration setStartNanos(long startNanos) {
- this.startNanos = startNanos;
- changedConfiguration = true;
- return this;
- }
-
- public void setStarted(boolean started) {
+ public synchronized void setStarted(boolean started) {
this.started = started;
- changedConfiguration = true;
+ changed = true;
}
public boolean hasChanged() {
- return changedConfiguration;
- }
-
- public boolean getReuse() {
- return reuse;
- }
-
- public boolean getOrdered() {
- return ordered;
- }
-
- public InternalEventFilter getFiler() {
- return eventFilter;
- }
-
- public long getStartNanos() {
- return startNanos;
- }
-
- public long getEndNanos() {
- return endNanos;
- }
-
- public InternalEventFilter getFilter() {
- return eventFilter;
+ return changed;
}
- public void clearDispatchCache() {
- cacheDispatchers = null;
- cacheEventType = null;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Runnable flush : flushActions) {
- sb.append("Flush Action: ").append(flush).append("\n");
- }
- for (Runnable close : closeActions) {
- sb.append("Close Action: " + close + "\n");
- }
- for (Consumer<?> error : errorActions) {
- sb.append("Error Action: " + error + "\n");
- }
- for (EventDispatcher dispatcher : dispatchers) {
- sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
- }
- sb.append("Closed: ").append(closed).append("\n");
- sb.append("Reuse: ").append(reuse).append("\n");
- sb.append("Ordered: ").append(ordered).append("\n");
- sb.append("Started: ").append(started).append("\n");
- sb.append("Start Time: ").append(startTime).append("\n");
- sb.append("Start Nanos: ").append(startNanos).append("\n");
- sb.append("End Time: ").append(endTime).append("\n");
- sb.append("End Nanos: ").append(endNanos).append("\n");
- return sb.toString();
- }
-
- EventDispatcher[] getDispatchers() {
- return dispatchers;
+ public synchronized void clearChanged() {
+ changed = false;
}
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java Fri Sep 13 18:46:07 2019 +0200
@@ -121,7 +121,7 @@
private final boolean untypedEventHandler;
private boolean guardHandlerReference;
private Class<?> superClass;
- private final static boolean streamingCommit = true; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming");
+ private final static boolean streamingCommit = false; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming");
EventInstrumentation(Class<?> superClass, byte[] bytes, long id) {
this.superClass = superClass;
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Fri Sep 13 18:46:07 2019 +0200
@@ -601,6 +601,7 @@
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
+ // ok
}
}
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Sep 12 20:46:55 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Fri Sep 13 18:46:07 2019 +0200
@@ -78,7 +78,7 @@
RecordingStream r = new RecordingStream();
AtomicLong count = new AtomicLong();
r.onEvent(e -> {
- if (count.incrementAndGet() == 100) {
+ if (count.incrementAndGet() > 100) {
streaming.countDown();
}
});