# HG changeset patch # User egahlin # Date 1568393167 -7200 # Node ID 7b751fe181a53cc81be7a6e947c46fb74daad7ce # Parent 6f8f18ac1d541bd5b14b6f4febf7206f4661bd32 Restructure stream configuration diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp --- a/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp Thu Sep 12 20:46:55 2019 -0700 +++ b/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp Fri Sep 13 18:46:07 2019 +0200 @@ -77,7 +77,7 @@ HRESULT res = D2D1CreateFactory(D2D1_FACTORY_TYPE_SINGLE_THREADED, &m_pDirect2dFactory); if (res == S_OK) { - m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY); + // m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY); m_pDirect2dFactory->Release(); } } diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Fri Sep 13 18:46:07 2019 +0200 @@ -26,20 +26,16 @@ package jdk.jfr.consumer; import java.io.IOException; -import java.lang.invoke.VarHandle; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import jdk.jfr.EventType; import jdk.jfr.internal.JVM; import jdk.jfr.internal.LogLevel; import jdk.jfr.internal.LogTag; @@ -59,40 +55,19 @@ */ abstract class AbstractEventStream implements EventStream { - final static class EventDispatcher { - final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - final String eventName; - final Consumer action; - - public EventDispatcher(Consumer action) { - this(null, action); - } + static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); - public EventDispatcher(String eventName, Consumer action) { - this.eventName = eventName; - this.action = action; - } - - public void offer(RecordedEvent event) { - action.accept(event); - } - - public boolean accepts(EventType eventType) { - return (eventName == null || eventType.getName().equals(eventName)); - } - } - - final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); private final static AtomicLong counter = new AtomicLong(1); - private volatile Thread thread; private final Object terminated = new Object(); private final boolean active; - private final Runnable flushOperation = () -> runFlushActions(); + private final Runnable flushOperation = () -> dispatcher().runFlushActions(); private final AccessControlContext accessControllerContext; - private final Object configurationLock = new Object(); + private final StreamConfiguration configuration = new StreamConfiguration(); - // Modified by updateConfiguration() - protected volatile StreamConfiguration configuration = new StreamConfiguration(); + private volatile Thread thread; + private Dispatcher dispatcher; + + private volatile boolean closed; public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { this.accessControllerContext = Objects.requireNonNull(acc); @@ -108,99 +83,85 @@ @Override abstract public void close(); - // Purpose of synchronizing the following methods is - // to serialize changes to the configuration so only one - // thread at a time can change the configuration. - // - // The purpose is not to guard the configuration field. A new - // configuration is published using updateConfiguration - // + protected final Dispatcher dispatcher() { + if (configuration.hasChanged()) { + synchronized (configuration) { + dispatcher = new Dispatcher(configuration); + } + } + return dispatcher; + } + @Override public final void setOrdered(boolean ordered) { - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); - } + configuration.setOrdered(ordered); } @Override public final void setReuse(boolean reuse) { - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); - } + configuration.setReuse(reuse); } @Override public final void setStartTime(Instant startTime) { Objects.nonNull(startTime); - synchronized (configurationLock) { - if (configuration.isStarted()) { + synchronized (configuration) { + if (configuration.started) { throw new IllegalStateException("Stream is already started"); } if (startTime.isBefore(Instant.EPOCH)) { startTime = Instant.EPOCH; } - updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); + configuration.setStartTime(startTime); } } @Override public final void setEndTime(Instant endTime) { Objects.requireNonNull(endTime); - synchronized (configurationLock) { - if (configuration.isStarted()) { + synchronized (configuration) { + if (configuration.started) { throw new IllegalStateException("Stream is already started"); } - updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); + configuration.setEndTime(endTime); } } @Override public final void onEvent(Consumer action) { Objects.requireNonNull(action); - synchronized (configurationLock) { - add(new EventDispatcher(action)); - } + configuration.addEventAction(action); } @Override public final void onEvent(String eventName, Consumer action) { Objects.requireNonNull(eventName); Objects.requireNonNull(action); - synchronized (configurationLock) { - add(new EventDispatcher(eventName, action)); - } + configuration.addEventAction(eventName, action); } @Override public final void onFlush(Runnable action) { Objects.requireNonNull(action); - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); - } + configuration.addFlushAction(action); } @Override public final void onClose(Runnable action) { Objects.requireNonNull(action); - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); - } + configuration.addCloseAction(action); } @Override public final void onError(Consumer action) { Objects.requireNonNull(action); - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); - } + configuration.addErrorAction(action); } @Override public final boolean remove(Object action) { Objects.requireNonNull(action); - synchronized (configurationLock) { - return updateConfiguration(new StreamConfiguration(configuration).remove(action)); - } + return configuration.remove(action); } @Override @@ -247,53 +208,12 @@ protected abstract void process() throws Exception; - protected final void dispatch(StreamConfiguration c, RecordedEvent event) { - EventType type = event.getEventType(); - EventDispatcher[] dispatchers = null; - if (type == c.cacheEventType) { - dispatchers = c.cacheDispatchers; - } else { - dispatchers = c.dispatcherLookup.get(type.getId()); - if (dispatchers == null) { - List list = new ArrayList<>(); - for (EventDispatcher e : c.getDispatchers()) { - if (e.accepts(type)) { - list.add(e); - } - } - dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); - c.dispatcherLookup.put(type.getId(), dispatchers); - } - c.cacheDispatchers = dispatchers; - } - for (int i = 0; i < dispatchers.length; i++) { - try { - dispatchers[i].offer(event); - } catch (Exception e) { - handleError(e); - } - } - } - - protected final void runCloseActions() { - Runnable[] closeActions = configuration.getCloseActions(); - for (int i = 0; i < closeActions.length; i++) { - try { - closeActions[i].run(); - } catch (Exception e) { - handleError(e); - } - } - } - protected final void setClosed(boolean closed) { - synchronized (configurationLock) { - updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); - } + this.closed = closed; } protected final boolean isClosed() { - return configuration.isClosed(); + return closed; } protected final void startAsync(long startNanos) { @@ -313,34 +233,15 @@ return flushOperation; } - private void add(EventDispatcher e) { - updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); - } - - private boolean updateConfiguration(StreamConfiguration newConfiguration) { - if (!Thread.holdsLock(configurationLock)) { - throw new InternalError("Modification of configuration without proper lock"); - } - if (newConfiguration.hasChanged()) { - // Publish objects held by configuration object - VarHandle.releaseFence(); - configuration = newConfiguration; - return true; - } - return false; - } - private void startInternal(long startNanos) { - synchronized (configurationLock) { - if (configuration.isStarted()) { + synchronized (configuration) { + if (configuration.started) { throw new IllegalStateException("Event stream can only be started once"); } - StreamConfiguration c = new StreamConfiguration(configuration); if (active) { - c.setStartNanos(startNanos); + configuration.setStartNanos(startNanos); } - c.setStarted(true); - updateConfiguration(c); + configuration.setStarted(true); } } @@ -348,8 +249,14 @@ JVM.getJVM().exclude(Thread.currentThread()); try { process(); + } catch (IOException ioe) { + // This can happen if a chunk file is removed, or + // a file is access that has been closed + // This is "normal" behavior for streaming and the + // stream will be closed when this happens } catch (Exception e) { - defaultErrorHandler(e); + // TODO: Remove before integrating + e.printStackTrace(); } finally { Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); try { @@ -362,30 +269,6 @@ } } - private void handleError(Throwable e) { - Consumer[] consumers = configuration.errorActions; - if (consumers.length == 0) { - defaultErrorHandler(e); - return; - } - for (int i = 0; i < consumers.length; i++) { - @SuppressWarnings("unchecked") - Consumer conusmer = (Consumer) consumers[i]; - conusmer.accept(e); - } - } - - private void runFlushActions() { - Runnable[] flushActions = configuration.getFlushActions(); - for (int i = 0; i < flushActions.length; i++) { - try { - flushActions[i].run(); - } catch (Exception e) { - handleError(e); - } - } - } - private void run(AccessControlContext accessControlContext) { AccessController.doPrivileged(new PrivilegedAction() { @Override @@ -400,8 +283,4 @@ counter.incrementAndGet(); return "JFR Event Stream " + counter; } - - private void defaultErrorHandler(Throwable e) { - e.printStackTrace(); - } } \ No newline at end of file diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Sep 13 18:46:07 2019 +0200 @@ -48,6 +48,28 @@ * */ final class ChunkParser { + + static final class ParserConfiguration { + final boolean reuse; + final boolean ordered; + final InternalEventFilter eventFilter; + + long filterStart; + long filterEnd; + + public ParserConfiguration(long filterStart, long filterEnd, boolean reuse, boolean ordered, InternalEventFilter filter) { + this.filterStart = filterStart; + this.filterEnd = filterEnd; + this.reuse = reuse; + this.ordered = ordered; + this.eventFilter = filter; + } + + public ParserConfiguration() { + this(0, Long.MAX_VALUE, false, false, InternalEventFilter.ACCEPT_ALL); + } + } + // Checkpoint that finishes a flush segment static final byte CHECKPOINT_FLUSH_MASK = 1; // Checkpoint contains chunk header information in the first pool @@ -70,24 +92,24 @@ private LongMap typeMap; private LongMap parsers; private boolean chunkFinished; - private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; - private boolean reuse; - private boolean ordered; - private boolean resetEventCache; - private long filterStart = 0; - private long filterEnd = Long.MAX_VALUE; + private Runnable flushOperation; + private ParserConfiguration configuration; - public ChunkParser(RecordingInput input, boolean reuse) throws IOException { - this(new ChunkHeader(input), null, 1000); - this.reuse = reuse; + public ChunkParser(RecordingInput input) throws IOException { + this(input, new ParserConfiguration()); + } + + public ChunkParser(RecordingInput input, ParserConfiguration pc) throws IOException { + this(new ChunkHeader(input), null, pc); } public ChunkParser(ChunkParser previous) throws IOException { - this(new ChunkHeader(previous.input), previous, 1000); + this(new ChunkHeader(previous.input), previous, new ParserConfiguration()); } - private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException { + private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc) throws IOException { + this.configuration = pc; this.input = header.getInput(); this.chunkHeader = header; if (previous == null) { @@ -98,9 +120,7 @@ this.constantLookups = previous.constantLookups; this.previousMetadata = previous.metadata; this.pollInterval = previous.pollInterval; - this.ordered = previous.ordered; - this.reuse = previous.reuse; - this.eventFilter = previous.eventFilter; + this.configuration = previous.configuration; } this.metadata = header.readMetadata(previousMetadata); this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset()); @@ -108,7 +128,7 @@ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); parsers = factory.getParsers(); typeMap = factory.getTypeMap(); - updateEventParsers(); + updateConfiguration(); } else { parsers = previous.parsers; typeMap = previous.typeMap; @@ -122,12 +142,37 @@ input.position(chunkHeader.getEventStart()); } - public void setParserFilter(InternalEventFilter filter) { - this.eventFilter = filter; + public ChunkParser nextChunkParser() throws IOException { + return new ChunkParser(chunkHeader.nextHeader(), this, configuration); + } + + private void updateConfiguration() { + updateConfiguration(configuration, false); } - public InternalEventFilter getEventFilter() { - return this.eventFilter; + public void updateConfiguration(ParserConfiguration configuration, boolean resetEventCache) { + this.configuration = configuration; + parsers.forEach(p -> { + if (p instanceof EventParser) { + EventParser ep = (EventParser) p; + if (resetEventCache) { + ep.resetCache(); + } + String name = ep.getEventType().getName(); + ep.setOrdered(configuration.ordered); + ep.setReuse(configuration.reuse); + ep.setFilterStart(configuration.filterStart); + ep.setFilterEnd(configuration.filterEnd); + long threshold = configuration.eventFilter.getThreshold(name); + if (threshold >= 0) { + ep.setEnabled(true); + ep.setThresholdNanos(threshold); + } else { + ep.setEnabled(false); + ep.setThresholdNanos(Long.MAX_VALUE); + } + } + }); } /** @@ -161,7 +206,7 @@ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); parsers = factory.getParsers(); typeMap = factory.getTypeMap(); - updateEventParsers(); + updateConfiguration();; } if (contantPosition != chunkHeader.getConstantPoolPosition()) { Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); @@ -372,76 +417,14 @@ return new ChunkParser(this); } - public ChunkParser nextChunkParser() throws IOException { - return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval); - } - public boolean isChunkFinished() { return chunkFinished; } - // Need to call updateEventParsers() for - // change to take effect - public void setReuse(boolean resue) { - this.reuse = resue; - } - public void setFlushOperation(Runnable flushOperation) { this.flushOperation = flushOperation; } - // Need to call updateEventParsers() for - // change to take effect - public void setOrdered(boolean ordered) { - this.ordered = ordered; - } - - // Need to call updateEventParsers() for - // change to take effect - public void setFilterStart(long filterStart) { - long chunkStart = chunkHeader.getStartNanos(); - // Optimization. - if (filterStart < chunkStart - 1_000_000_000L) { - filterStart = 0; - } - this.filterStart = filterStart; - } - - public void setFilterEnd(long filterEnd) { - this.filterEnd = filterEnd; - } - - // Need to call updateEventParsers() for - // change to take effect - public void resetEventCache() { - this.resetEventCache = true; - } - - public void updateEventParsers() { - parsers.forEach(p -> { - if (p instanceof EventParser) { - EventParser ep = (EventParser) p; - String name = ep.getEventType().getName(); - ep.setOrdered(ordered); - ep.setReuse(reuse); - ep.setFilterStart(filterStart); - ep.setFilterEnd(filterEnd); - if (resetEventCache) { - ep.resetCache(); - } - long threshold = eventFilter.getThreshold(name); - if (threshold >= 0) { - ep.setEnabled(true); - ep.setThresholdNanos(threshold); - } else { - ep.setEnabled(false); - ep.setThresholdNanos(Long.MAX_VALUE); - } - } - }); - resetEventCache = false; - } - public long getChunkDuration() { return chunkHeader.getDurationNanos(); } @@ -449,4 +432,5 @@ public long getStartNanos() { return chunkHeader.getStartNanos(); } + } diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java Fri Sep 13 18:46:07 2019 +0200 @@ -0,0 +1,144 @@ +package jdk.jfr.consumer; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import jdk.jfr.EventType; +import jdk.jfr.consumer.ChunkParser.ParserConfiguration; +import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.consumer.InternalEventFilter; + +public final class Dispatcher { + + public final static class EventDispatcher { + final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; + final String eventName; + final Consumer action; + + public EventDispatcher(Consumer action) { + this(null, action); + } + + public EventDispatcher(String eventName, Consumer action) { + this.eventName = eventName; + this.action = action; + } + + public void offer(RecordedEvent event) { + action.accept(event); + } + + public boolean accepts(EventType eventType) { + return (eventName == null || eventType.getName().equals(eventName)); + } + } + + final Consumer[] errorActions; + final Runnable[] flushActions; + final Runnable[] closeActions; + final EventDispatcher[] dispatchers; + final LongMap dispatcherLookup = new LongMap<>(); + final ParserConfiguration parserConfiguration; + final Instant startTime; + final Instant endTime; + final long startNanos; + final long endNanos; + + // Cache + private EventType cacheEventType; + private EventDispatcher[] cacheDispatchers; + + @SuppressWarnings({"unchecked","rawtypes"}) + public Dispatcher(StreamConfiguration c) { + this.flushActions = c.flushActions.toArray(new Runnable[0]); + this.closeActions = c.closeActions.toArray(new Runnable[0]); + this.errorActions = c.errorActions.toArray(new Consumer[0]); + this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]); + this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers)); + this.startTime = c.startTime; + this.endTime = c.endTime; + this.startNanos = c.startNanos; + this.endNanos = c.endNanos; + } + + private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { + InternalEventFilter ef = new InternalEventFilter(); + for (EventDispatcher ed : dispatchers) { + String name = ed.eventName; + if (name == null) { + return InternalEventFilter.ACCEPT_ALL; + } + ef.setThreshold(name, 0); + } + return ef; + } + + protected final void dispatch(RecordedEvent event) { + EventType type = event.getEventType(); + EventDispatcher[] dispatchers = null; + if (type == cacheEventType) { + dispatchers = cacheDispatchers; + } else { + dispatchers = dispatcherLookup.get(type.getId()); + if (dispatchers == null) { + List list = new ArrayList<>(); + for (EventDispatcher e : this.dispatchers) { + if (e.accepts(type)) { + list.add(e); + } + } + dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); + dispatcherLookup.put(type.getId(), dispatchers); + } + cacheDispatchers = dispatchers; + } + for (int i = 0; i < dispatchers.length; i++) { + try { + dispatchers[i].offer(event); + } catch (Exception e) { + handleError(e); + } + } + } + + public void handleError(Throwable e) { + Consumer[] consumers = this.errorActions; + if (consumers.length == 0) { + defaultErrorHandler(e); + return; + } + for (int i = 0; i < consumers.length; i++) { + @SuppressWarnings("unchecked") + Consumer conusmer = (Consumer) consumers[i]; + conusmer.accept(e); + } + } + + public void runFlushActions() { + Runnable[] flushActions = this.flushActions; + for (int i = 0; i < flushActions.length; i++) { + try { + flushActions[i].run(); + } catch (Exception e) { + handleError(e); + } + } + } + + public void runCloseActions() { + Runnable[] closeActions = this.closeActions; + for (int i = 0; i < closeActions.length; i++) { + try { + closeActions[i].run(); + } catch (Exception e) { + handleError(e); + } + } + } + + void defaultErrorHandler(Throwable e) { + e.printStackTrace(); + } +} diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Sep 13 18:46:07 2019 +0200 @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Objects; +import jdk.jfr.consumer.ChunkParser.ParserConfiguration; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.FileAccess; import jdk.jfr.internal.consumer.RecordingInput; @@ -46,6 +47,7 @@ private final RepositoryFiles repositoryFiles; private final boolean active; private final FileAccess fileAccess; + private ChunkParser chunkParser; private long chunkStartNanos; private RecordedEvent[] sortedList; @@ -60,7 +62,7 @@ @Override public void close() { setClosed(true); - runCloseActions(); + dispatcher().runCloseActions(); repositoryFiles.close(); } @@ -76,11 +78,12 @@ @Override protected void process() throws Exception { - StreamConfiguration c = configuration; + Dispatcher disp = dispatcher(); + Path path; - boolean validStartTime = active || c.getStartTime() != null; + boolean validStartTime = active || disp.startTime != null; if (validStartTime) { - path = repositoryFiles.firstPath(c.getStartNanos()); + path = repositoryFiles.firstPath(disp.startNanos); } else { path = repositoryFiles.lastPath(); } @@ -89,28 +92,24 @@ } chunkStartNanos = repositoryFiles.getTimestamp(path); try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { - chunkParser = new ChunkParser(input, c.getReuse()); + chunkParser = new ChunkParser(input, disp.parserConfiguration); long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); - long filtertStart = validStartTime ? c.getStartNanos() : segmentStart; - long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE; + long filterStart = validStartTime ? disp.startNanos : segmentStart; + long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; + while (!isClosed()) { boolean awaitnewEvent = false; while (!isClosed() && !chunkParser.isChunkFinished()) { - c = configuration; - boolean ordered = c.getOrdered(); + disp = dispatcher(); + ParserConfiguration pc = disp.parserConfiguration; + pc.filterStart = filterStart; + pc.filterEnd = filterEnd; + chunkParser.updateConfiguration(pc, true); chunkParser.setFlushOperation(getFlushOperation()); - chunkParser.setReuse(c.getReuse()); - chunkParser.setOrdered(ordered); - chunkParser.setFilterStart(filtertStart); - chunkParser.setFilterEnd(filterEnd); - chunkParser.resetEventCache(); - chunkParser.setParserFilter(c.getFilter()); - chunkParser.updateEventParsers(); - c.clearDispatchCache(); - if (ordered) { - awaitnewEvent = processOrdered(c, awaitnewEvent); + if (pc.ordered) { + awaitnewEvent = processOrdered(disp, awaitnewEvent); } else { - awaitnewEvent = processUnordered(c, awaitnewEvent); + awaitnewEvent = processUnordered(disp, awaitnewEvent); } if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) { close(); @@ -135,7 +134,7 @@ } } - private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException { + private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { if (sortedList == null) { sortedList = new RecordedEvent[100_000]; } @@ -164,18 +163,18 @@ Arrays.sort(sortedList, 0, index, END_TIME); } for (int i = 0; i < index; i++) { - dispatch(c, sortedList[i]); + c.dispatch(sortedList[i]); } return awaitNewEvents; } - private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException { + private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException { while (true) { RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); if (e == null) { return true; } else { - dispatch(c, e); + c.dispatch(e); } } } diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Sep 13 18:46:07 2019 +0200 @@ -62,7 +62,7 @@ @Override public void close() { setClosed(true); - runCloseActions(); + dispatcher().runCloseActions(); try { input.close(); } catch (IOException e) { @@ -72,46 +72,40 @@ @Override protected void process() throws IOException { - StreamConfiguration c = configuration; + Dispatcher disp = dispatcher(); long start = 0; long end = Long.MAX_VALUE; - if (c.getStartTime() != null) { - start = c.getStartNanos(); + if (disp.startTime != null) { + start = disp.startNanos; } - if (c.getEndTime() != null) { - end = c.getEndNanos(); + if (disp.endTime != null) { + end = disp.endNanos; } - chunkParser = new ChunkParser(input, c.getReuse()); + chunkParser = new ChunkParser(input, disp.parserConfiguration); while (!isClosed()) { if (chunkParser.getStartNanos() > end) { close(); return; } - c = configuration; - boolean ordered = c.getOrdered(); + disp = dispatcher(); + disp.parserConfiguration.filterStart = start; + disp.parserConfiguration.filterEnd = end; + chunkParser.updateConfiguration(disp.parserConfiguration, true); chunkParser.setFlushOperation(getFlushOperation()); - chunkParser.setFilterStart(start); - chunkParser.setFilterEnd(end); - chunkParser.setReuse(c.getReuse()); - chunkParser.setOrdered(ordered); - chunkParser.resetEventCache(); - chunkParser.setParserFilter(c.getFiler()); - chunkParser.updateEventParsers(); - c.clearDispatchCache(); - if (ordered) { - processOrdered(c); + if (disp.parserConfiguration.ordered) { + processOrdered(disp); } else { - processUnordered(c); + processUnordered(disp); } - if (chunkParser.isLastChunk()) { + if (isClosed() || chunkParser.isLastChunk()) { return; } chunkParser = chunkParser.nextChunkParser(); } } - private void processOrdered(StreamConfiguration c) throws IOException { + private void processOrdered(Dispatcher c) throws IOException { if (sortedList == null) { sortedList = new RecordedEvent[10_000]; } @@ -122,7 +116,7 @@ if (event == null) { Arrays.sort(sortedList, 0, index, END_TIME); for (int i = 0; i < index; i++) { - dispatch(c, sortedList[i]); + c.dispatch(sortedList[i]); } return; } @@ -135,13 +129,13 @@ } } - private void processUnordered(StreamConfiguration c) throws IOException { + private void processUnordered(Dispatcher c) throws IOException { while (!isClosed()) { RecordedEvent event = chunkParser.readEvent(); if (event == null) { return; } - dispatch(c, event); + c.dispatch(event); } } } diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Sep 13 18:46:07 2019 +0200 @@ -40,12 +40,60 @@ /** * Represents a stream of events. + *

+ * A stream is a sequence of events and the way to interact with a stream is to + * register actions. + *

+ * To receive a notification when an event arrives, register an action using the + * {@link #onEvent(Consumer)} method. To filter the stream for an event with a + * specific name, use {@link #onEvent(String, Consumer)} method. + * + * By default, the same {@code RecordedEvent} object can be used for + * representing two or more distinct events. The object can be delivered + * multiple times to the same action as well as to other actions. If the life + * cycle of the event object is needed outside the scope of an action, the + * {@link #setReuse(boolean)} method should be set to {@code false} so that a + * new object is allocated for each event. + * + *

+ * Events are delivered in batches. To receive a notification when a batch is + * complete, register an action using the {@link #onFlush(Runnable)} method. + * This is an opportunity to aggregate or push data to external systems while + * the Java Virtual Machine (JVM) is preparing the next batch. + *

+ * Events within a batch are sorted chronologically by their end time. If + * ordering is not a concern, sorting can be disabled using the + * {@link #setOrdered(boolean)} method. + *

+ * To dispatch events to registered actions, the stream must be started. To + * start processing in the current thread, invoke the {@link #start()} method. + * To process actions asynchronously in a separate thread, invoke the + * {@link #startAsync()} method. To await completion of the stream, use the + * awaitTermination {@link #awaitTermination()} or the {link + * {@link #awaitTermination(Duration)} method. + *

+ * When a stream ends it is automatically closed. To manually stop processing of + * events, close the stream with the {@link #close()} method. A stream can also + * be automatically closed in exceptional circumstances, for instance if the JVM + * exits. To receive a notification in any of these occasions, use the + * {@link #onClose(Runnable)} method to register an action. + *

+ * If an unexpected exception occurs in an action, it is possible to catch the + * exception in an error handler. An error handler can be registered using the + * {@link #onError(Runnable)} method. If no error handler is registered, the + * default behavior is to print the exception and its backtrace to the standard + * error stream. + *

+ * The following example demonstrates how an {@code EventStream} can be used to + * listen to garbage collection and CPU Load events + *

+ * */ public interface EventStream extends AutoCloseable { /** - * Creates a stream from the disk repository of the current Java Virtual - * Machine (JVM). + * Creates a stream from the repository of the current Java Virtual Machine + * (JVM). *

* By default, the stream starts with the next event flushed by Flight * Recorder. @@ -96,7 +144,7 @@ * * @return an event stream, not {@code null} * - * @throws IOException if a stream can't be opened, or an I/O error occurs + * @throws IOException if the file can't be opened, or an I/O error occurs * during reading * * @throws SecurityException if a security manager exists and its @@ -107,71 +155,75 @@ } /** - * Performs an action on all events in the stream. + * Registers an action to perform on all events in the stream. * - * @param action an action to be performed on each {@code RecordedEvent}, - * not {@code null} + * @param action an action to perform on each {@code RecordedEvent}, not + * {@code null} */ void onEvent(Consumer action); /** - * Performs an action on all events in the stream with a specified name. + * Registers an action to perform on all events matching a name. * * @param eventName the name of the event, not {@code null} * - * @param action an action to be performed on each {@code RecordedEvent} - * that matches the event name, not {@code null} + * @param action an action to perform on each {@code RecordedEvent} matching + * the event name, not {@code null} */ void onEvent(String eventName, Consumer action); /** - * Performs an action when the event stream has been flushed. + * Registers an action to perform after the stream has been flushed. * - * @param action an action to be performed after stream has been flushed, - * not {@code null} + * @param action an action to perform after the stream has been + * flushed, not {@code null} */ void onFlush(Runnable action); /** - * Performs an action if an exception occurs when processing the stream. + * Registers an action to perform if an exception occurs. *

- * if an error handler has not been added to the stream, an exception stack - * trace is printed to standard error. + * if an action is not registered, an exception stack trace is printed to + * standard error. *

- * Adding an error handler overrides the default behavior. If multiple error - * handlers have been added, they will be executed in the order they were - * added. + * Registering an action overrides the default behavior. If multiple actions + * have been registered, they are performed in the order of registration. + *

+ * If this method itself throws an exception, resulting behavior is + * undefined. * - * @param action an action to be performed if an exception occurs, not + * @param action an action to perform if an exception occurs, not * {@code null} */ void onError(Consumer action); /** - * Performs an action when the event stream is closed. + * Registers an action to perform when the stream is closed. *

- * If the stream is already closed, the action will be executed immediately + * If the stream is already closed, the action will be performed immediately * in the current thread. * - * @param action an action to be performed after the stream has been closed, - * not {@code null} + * @param action an action to perform after the stream is closed, not + * {@code null} + * @see #close() */ void onClose(Runnable action); /** - * Releases all resources associated with this event stream. + * Releases all resources associated with this stream. */ void close(); /** - * Removes an action from the stream. + * Unregisters an action. *

- * If the action has been added multiple times, all instance of it will be - * removed. + * If the action has been registered multiple times, all instances are + * unregistered. * - * @param action the action to remove, not {@code null} + * @param action the action to unregister, not {@code null} * - * @return {@code true} if the action was removed, {@code false} otherwise + * @return {@code true} if the action was unregistered, {@code false} + * otherwise * * @see #onEvent(Consumer) * @see #onEvent(String, Consumer) @@ -183,91 +235,98 @@ /** * Specifies that the event object in an {@link #onEvent(Consumer)} action - * is to be reused. + * can be reused. *

- * If reuse is set to {@code true), a callback should not keep a reference - * to the event object after the callback from {@code onEvent} has returned. + * If reuse is set to {@code true), an action should not keep a reference + * to the event object after the action has completed. * - * @param resuse if event objects can be reused between calls to - * {@code #onEvent(Consumer)} - * + * @param reuse {@code true} if an event object can be reused, {@code false} + * otherwise */ - public void setReuse(boolean reuse); + void setReuse(boolean reuse); /** * Specifies that events arrives in chronological order, sorted by the time - * they were committed to the event stream. + * they were committed to the stream. * * @param ordered if event objects arrive in chronological order to * {@code #onEvent(Consumer)} */ - public void setOrdered(boolean ordered); + void setOrdered(boolean ordered); /** - * Specifies start time of the event stream. + * Specifies the start time of the stream. *

- * The start time must be set before the stream is started. + * The start time must be set before starting the stream * * @param startTime the start time, not {@code null} * - * @throws IllegalStateException if the stream has already been started + * @throws IllegalStateException if the stream is already started + * + * @see #start() + * @see #startAsync() */ - public void setStartTime(Instant startTime); + void setStartTime(Instant startTime); /** - * Specifies end time of the event stream. + * Specifies the end time of the stream. *

- * The end time must be set before the stream is started. + * The end time must be set before starting the stream. *

- * When the end time is reached the stream is closed. + * At end time, the stream is closed. * * @param endTime the end time, not {@code null} * - * @throws IllegalStateException if the stream has already been started + * @throws IllegalStateException if the stream is already started + * + * @see #start() + * @see #startAsync() */ - public void setEndTime(Instant endTime); + void setEndTime(Instant endTime); /** - * Start processing events in the stream. + * Start processing of actions. *

- * All actions performed on this stream will happen in the current thread. + * Actions are performed in the current thread. * - * @throws IllegalStateException if the stream is already started or if it - * has been closed + * @throws IllegalStateException if the stream is already started or closed */ void start(); /** - * Start processing events in the stream asynchronously. + * Start asynchronous processing of actions. *

- * All actions on this stream will be performed in a separate thread. + * Actions are performed in a single separate thread. * - * @throws IllegalStateException if the stream is already started, or if it - * has been closed + * @throws IllegalStateException if the stream is already started or closed */ void startAsync(); /** - * Blocks the current thread until the stream is finished, closed, or it - * times out. + * Blocks until all actions are completed, or the stream is closed, or the + * timeout occurs, or the current thread is interrupted, whichever happens + * first. * * @param timeout the maximum time to wait, not {@code null} * * @throws IllegalArgumentException if timeout is negative - * @throws InterruptedException + * @throws InterruptedException if interrupted while waiting * * @see #start() * @see #startAsync() + * @see Thread#interrupt() */ void awaitTermination(Duration timeout) throws InterruptedException; /** - * Blocks the current thread until the stream is finished or closed. + * Blocks until all actions are completed, or the stream is closed, or the + * current thread is interrupted, whichever happens first. * - * @throws InterruptedException + * @throws InterruptedException if interrupted while waiting * * @see #start() * @see #startAsync() + * @see Thread#interrupt() */ void awaitTermination() throws InterruptedException; } \ No newline at end of file diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java Fri Sep 13 18:46:07 2019 +0200 @@ -238,7 +238,7 @@ private void findNext() throws IOException { while (nextEvent == null) { if (chunkParser == null) { - chunkParser = new ChunkParser(input, false); + chunkParser = new ChunkParser(input); } else if (!chunkParser.isLastChunk()) { chunkParser = chunkParser.nextChunkParser(); } else { diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Fri Sep 13 18:46:07 2019 +0200 @@ -47,7 +47,7 @@ * A recording stream produces events from the current JVM (Java Virtual * Machine). *

- * The following example, shows how to record events using the default + * The following example shows how to record events using the default * configuration and print the Garbage Collection, CPU Load and JVM Information * event to standard out. * @@ -110,7 +110,7 @@ * * * - * @param configuration configuration that contains the settings to be use, + * @param configuration configuration that contains the settings to use, * not {@code null} * * @throws IllegalStateException if Flight Recorder can't be created (for @@ -146,16 +146,17 @@ } /** - * Replaces all settings for this recording stream + * Replaces all settings for this recording stream. *

- * The following example records 20 second using the "default" configuration - * and then changes to settings for the "profile" configuration. + * The following example records 20 seconds using the "default" configuration + * and then changes settings to the "profile" configuration. * *

      * 
-     *     var defaultConfiguration = Configuration.getConfiguration("default");
-     *     var profileConfiguration = Configuration.getConfiguration("profile");
-     *     try (var rs = new RecordingStream(defaultConfiguration) {
+     *     Configuration defaultConfiguration = Configuration.getConfiguration("default");
+     *     Configuration profileConfiguration = Configuration.getConfiguration("profile");
+     *     try (RecordingStream rs = new RecordingStream(defaultConfiguration) {
+     *        rs.onEvent(System.out::println);
      *        rs.startAsync();
      *        Thread.sleep(20_000);
      *        rs.setSettings(profileConfiguration.getSettings());
@@ -165,6 +166,8 @@
      * 
* * @param settings the settings to set, not {@code null} + * + * @see Recording#setSettings(Map) */ public void setSettings(Map settings) { recording.setSettings(settings); @@ -217,8 +220,7 @@ } /** - * Determines how far back data is kept for the stream, if the stream can't - * keep up. + * Determines how far back data is kept for the stream. *

* To control the amount of recording data stored on disk, the maximum * length of time to retain the data can be specified. Data stored on disk @@ -241,8 +243,7 @@ } /** - * Determines how much data is kept in the disk repository if the stream - * can't keep up. + * Determines how much data is kept for the stream. *

* To control the amount of recording data that is stored on disk, the * maximum amount of data to retain can be specified. When the maximum limit diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java Fri Sep 13 18:46:07 2019 +0200 @@ -2,267 +2,101 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.function.Consumer; -import jdk.jfr.EventType; -import jdk.jfr.consumer.AbstractEventStream.EventDispatcher; -import jdk.jfr.internal.LongMap; +import jdk.jfr.consumer.Dispatcher.EventDispatcher; import jdk.jfr.internal.Utils; -import jdk.jfr.internal.consumer.InternalEventFilter; final class StreamConfiguration { - private static final Runnable[] NO_ACTIONS = new Runnable[0]; - - Consumer[] errorActions = new Consumer[0]; - private Runnable[] flushActions = NO_ACTIONS; - private Runnable[] closeActions = NO_ACTIONS; - private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS; - private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; - LongMap dispatcherLookup = new LongMap<>(); + final List closeActions = new ArrayList<>(); + final List flushActions = new ArrayList<>(); + final List eventActions = new ArrayList<>(); + final List> errorActions = new ArrayList<>(); - private boolean changedConfiguration = false; - private boolean closed = false; - private boolean reuse = true; - private boolean ordered = true; - private Instant startTime = null; - private Instant endTime = null; - private boolean started = false; - private long startNanos = 0; - private long endNanos = Long.MAX_VALUE; + boolean reuse = true; + boolean ordered = true; + Instant startTime = null; + Instant endTime = null; + boolean started = false; + long startNanos = 0; + long endNanos = Long.MAX_VALUE; - // Cache the last event type and dispatch. - EventType cacheEventType; - EventDispatcher[] cacheDispatchers; - + volatile boolean changed = true; - public StreamConfiguration(StreamConfiguration configuration) { - this.flushActions = configuration.flushActions; - this.closeActions = configuration.closeActions; - this.errorActions = configuration.errorActions; - this.dispatchers = configuration.dispatchers; - this.eventFilter = configuration.eventFilter; - this.closed = configuration.closed; - this.reuse = configuration.reuse; - this.ordered = configuration.ordered; - this.startTime = configuration.startTime; - this.endTime = configuration.endTime; - this.started = configuration.started; - this.startNanos = configuration.startNanos; - this.endNanos = configuration.endNanos; - this.dispatcherLookup = configuration.dispatcherLookup; + public synchronized boolean remove(Object action) { + boolean removed = false; + removed |= flushActions.removeIf(e -> e == action); + removed |= closeActions.removeIf(e -> e == action); + removed |= errorActions.removeIf(e -> e == action); + removed |= eventActions.removeIf(e -> e.action == action); + if (removed) { + changed = true; + } + return removed; } - public StreamConfiguration() { - } - - public StreamConfiguration remove(Object action) { - flushActions = remove(flushActions, action); - closeActions = remove(closeActions, action); - errorActions = remove(errorActions, action); - dispatchers = removeDispatch(dispatchers, action); - return this; - } - - public StreamConfiguration addDispatcher(EventDispatcher e) { - dispatchers = add(dispatchers, e); - eventFilter = buildFilter(dispatchers); - dispatcherLookup = new LongMap<>(); - return this; - } - - public StreamConfiguration addFlushAction(Runnable action) { - flushActions = add(flushActions, action); - return this; + public synchronized void addEventAction(String name, Consumer consumer) { + eventActions.add(new EventDispatcher(name, consumer)); + changed = true; } - public StreamConfiguration addCloseAction(Runnable action) { - closeActions = add(closeActions, action); - return this; - } - - public StreamConfiguration addErrorAction(Consumer action) { - errorActions = add(errorActions, action); - return this; - } - - public StreamConfiguration setClosed(boolean closed) { - this.closed = closed; - changedConfiguration = true; - return this; - } - - public boolean isClosed() { - return closed; - } - - public Runnable[] getCloseActions() { - return closeActions; + public void addEventAction(Consumer action) { + addEventAction(null, action); } - public Runnable[] getFlushActions() { - return flushActions; + public synchronized void addFlushAction(Runnable action) { + flushActions.add(action); + changed = true; } - private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) { - List list = new ArrayList<>(array.length); - boolean modified = false; - for (int i = 0; i < array.length; i++) { - if (array[i].action != action) { - list.add(array[i]); - } else { - modified = true; - } - } - EventDispatcher[] result = list.toArray(new EventDispatcher[0]); - if (modified) { - eventFilter = buildFilter(result); - dispatcherLookup = new LongMap<>(); - changedConfiguration = true; - } - return result; + public synchronized void addCloseAction(Runnable action) { + closeActions.add(action); + changed = true; } - private T[] remove(T[] array, Object action) { - List list = new ArrayList<>(array.length); - for (int i = 0; i < array.length; i++) { - if (array[i] != action) { - list.add(array[i]); - } else { - changedConfiguration = true; - } - } - return list.toArray(array); + public synchronized void addErrorAction(Consumer action) { + errorActions.add(action); + changed = true; } - private T[] add(T[] array, T object) { - List list = new ArrayList<>(Arrays.asList(array)); - list.add(object); - changedConfiguration = true; - return list.toArray(array); + public synchronized void setReuse(boolean reuse) { + this.reuse = reuse; + changed = true; } - private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { - InternalEventFilter ef = new InternalEventFilter(); - for (EventDispatcher ed : dispatchers) { - String name = ed.eventName; - if (name == null) { - return InternalEventFilter.ACCEPT_ALL; - } - ef.setThreshold(name, 0); - } - return ef; + public synchronized void setOrdered(boolean ordered) { + this.ordered = ordered; + changed = true; } - public StreamConfiguration setReuse(boolean reuse) { - this.reuse = reuse; - changedConfiguration = true; - return this; - } - - public StreamConfiguration setOrdered(boolean ordered) { - this.ordered = ordered; - changedConfiguration = true; - return this; - } - - public StreamConfiguration setEndTime(Instant endTime) { + public synchronized void setEndTime(Instant endTime) { this.endTime = endTime; this.endNanos = Utils.timeToNanos(endTime); - changedConfiguration = true; - return this; + changed = true; } - public StreamConfiguration setStartTime(Instant startTime) { + public synchronized void setStartTime(Instant startTime) { this.startTime = startTime; this.startNanos = Utils.timeToNanos(startTime); - changedConfiguration = true; - return this; - } - - public Instant getStartTime() { - return startTime; + changed = true; } - public Object getEndTime() { - return endTime; - } - - public boolean isStarted() { - return started; + public synchronized void setStartNanos(long startNanos) { + this.startNanos = startNanos; + changed = true; } - public StreamConfiguration setStartNanos(long startNanos) { - this.startNanos = startNanos; - changedConfiguration = true; - return this; - } - - public void setStarted(boolean started) { + public synchronized void setStarted(boolean started) { this.started = started; - changedConfiguration = true; + changed = true; } public boolean hasChanged() { - return changedConfiguration; - } - - public boolean getReuse() { - return reuse; - } - - public boolean getOrdered() { - return ordered; - } - - public InternalEventFilter getFiler() { - return eventFilter; - } - - public long getStartNanos() { - return startNanos; - } - - public long getEndNanos() { - return endNanos; - } - - public InternalEventFilter getFilter() { - return eventFilter; + return changed; } - public void clearDispatchCache() { - cacheDispatchers = null; - cacheEventType = null; - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - for (Runnable flush : flushActions) { - sb.append("Flush Action: ").append(flush).append("\n"); - } - for (Runnable close : closeActions) { - sb.append("Close Action: " + close + "\n"); - } - for (Consumer error : errorActions) { - sb.append("Error Action: " + error + "\n"); - } - for (EventDispatcher dispatcher : dispatchers) { - sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n"); - } - sb.append("Closed: ").append(closed).append("\n"); - sb.append("Reuse: ").append(reuse).append("\n"); - sb.append("Ordered: ").append(ordered).append("\n"); - sb.append("Started: ").append(started).append("\n"); - sb.append("Start Time: ").append(startTime).append("\n"); - sb.append("Start Nanos: ").append(startNanos).append("\n"); - sb.append("End Time: ").append(endTime).append("\n"); - sb.append("End Nanos: ").append(endNanos).append("\n"); - return sb.toString(); - } - - EventDispatcher[] getDispatchers() { - return dispatchers; + public synchronized void clearChanged() { + changed = false; } } \ No newline at end of file diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java Fri Sep 13 18:46:07 2019 +0200 @@ -121,7 +121,7 @@ private final boolean untypedEventHandler; private boolean guardHandlerReference; private Class superClass; - private final static boolean streamingCommit = true; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming"); + private final static boolean streamingCommit = false; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming"); EventInstrumentation(Class superClass, byte[] bytes, long id) { this.superClass = superClass; diff -r 6f8f18ac1d54 -r 7b751fe181a5 src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Thu Sep 12 20:46:55 2019 -0700 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java Fri Sep 13 18:46:07 2019 +0200 @@ -601,6 +601,7 @@ try { Thread.sleep(millis); } catch (InterruptedException e) { + // ok } } diff -r 6f8f18ac1d54 -r 7b751fe181a5 test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java --- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Thu Sep 12 20:46:55 2019 -0700 +++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java Fri Sep 13 18:46:07 2019 +0200 @@ -78,7 +78,7 @@ RecordingStream r = new RecordingStream(); AtomicLong count = new AtomicLong(); r.onEvent(e -> { - if (count.incrementAndGet() == 100) { + if (count.incrementAndGet() > 100) { streaming.countDown(); } });