# HG changeset patch # User egahlin # Date 1564574864 -7200 # Node ID 838f9a7635b61631472038eaec805fc7a8c9cf81 # Parent 025c9b8eaefdf588f5cf0e0a854427bf61c4d3cd Cleaner stream reconfiguration + reduced allocation in JFR framework diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Wed Jul 31 14:07:44 2019 +0200 @@ -0,0 +1,547 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.consumer; + +import java.io.IOException; +import java.lang.invoke.VarHandle; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +import jdk.jfr.EventType; +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; +import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.consumer.InternalEventFilter; + +/* + * Purpose of this class is to simplify the implementation of + * an event stream. In particular, it handles: + * + * - configuration storage + * - atomic updates to a configuration + * - dispatch mechanism + * - error handling + * - security + * + */ +abstract class AbstractEventStream implements Runnable { + + public static final class StreamConfiguration { + private static final Runnable[] NO_ACTIONS = new Runnable[0]; + + private Runnable[] flushActions = NO_ACTIONS; + private Runnable[] closeActions = NO_ACTIONS; + private EventDispatcher[] dispatchers = NO_DISPATCHERS; + private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; + private boolean closed = false; + private boolean reuse = true; + private boolean ordered = true; + private Instant startTime = null; + private boolean started = false; + private long startNanos = 0; + private LongMap dispatcherLookup = new LongMap<>(); + + private boolean changed = false; + + public StreamConfiguration(StreamConfiguration configuration) { + this.flushActions = configuration.flushActions; + this.closeActions = configuration.closeActions; + this.dispatchers = configuration.dispatchers; + this.eventFilter = configuration.eventFilter; + this.closed = configuration.closed; + this.reuse = configuration.reuse; + this.ordered = configuration.ordered; + this.startTime = configuration.startTime; + this.started = configuration.started; + this.startNanos = configuration.startNanos; + this.dispatcherLookup = configuration.dispatcherLookup; + } + + public StreamConfiguration() { + } + + final public StreamConfiguration remove(Object action) { + flushActions = remove(flushActions, action); + closeActions = remove(closeActions, action); + dispatchers = removeDispatch(dispatchers, action); + return this; + } + + final public StreamConfiguration addDispatcher(EventDispatcher e) { + dispatchers = add(dispatchers, e); + eventFilter = buildFilter(dispatchers); + dispatcherLookup = new LongMap<>(); + return this; + } + + final public StreamConfiguration addFlushAction(Runnable action) { + flushActions = add(flushActions, action); + return this; + } + + final public StreamConfiguration addCloseAction(Runnable action) { + closeActions = add(closeActions, action); + return this; + } + + final public StreamConfiguration setClosed(boolean closed) { + this.closed = closed; + changed = true; + return this; + } + + final public boolean isClosed() { + return closed; + } + + final public Runnable[] getCloseActions() { + return closeActions; + } + + final public Runnable[] getFlushActions() { + return flushActions; + } + + private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) { + List list = new ArrayList<>(array.length); + boolean modified = false; + for (int i = 0; i < array.length; i++) { + if (array[i].action != action) { + list.add(array[i]); + } else { + modified = true; + } + } + EventDispatcher[] result = list.toArray(new EventDispatcher[0]); + if (modified) { + eventFilter = buildFilter(result); + dispatcherLookup = new LongMap<>(); + changed = true; + } + return result; + } + + private T[] remove(T[] array, Object action) { + List list = new ArrayList<>(array.length); + for (int i = 0; i < array.length; i++) { + if (array[i] != action) { + list.add(array[i]); + } else { + changed = true; + } + } + return list.toArray(array); + } + + private T[] add(T[] array, T object) { + List list = new ArrayList<>(Arrays.asList(array)); + list.add(object); + changed = true; + return list.toArray(array); + } + + private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { + InternalEventFilter ef = new InternalEventFilter(); + for (EventDispatcher ed : dispatchers) { + String name = ed.eventName; + if (name == null) { + return InternalEventFilter.ACCEPT_ALL; + } + ef.setThreshold(name, 0); + } + return ef.threadSafe(); + } + + final public StreamConfiguration setReuse(boolean reuse) { + this.reuse = reuse; + changed = true; + return this; + } + + final public StreamConfiguration setOrdered(boolean ordered) { + this.ordered = ordered; + changed = true; + return this; + } + + final public StreamConfiguration setStartTime(Instant startTime) { + this.startTime = startTime; + changed = true; + return this; + } + + final public Instant getStartTime() { + return startTime; + } + + final public boolean isStarted() { + return started; + } + + final public StreamConfiguration setStartNanos(long startNanos) { + this.startNanos = startNanos; + changed = true; + return this; + } + + final public void setStarted(boolean started) { + this.started = started; + changed = true; + } + + final public boolean hasChanged() { + return changed; + } + + final public boolean getReuse() { + return reuse; + } + + final public boolean getOrdered() { + return ordered; + } + + final public InternalEventFilter getFiler() { + return eventFilter; + } + + final public long getStartNanos() { + return startNanos; + } + + final public InternalEventFilter getFilter() { + return eventFilter; + } + + final public String toString() { + StringBuilder sb = new StringBuilder(); + for (Runnable flush : flushActions) { + sb.append("Flush Action: ").append(flush).append("\n"); + } + for (Runnable close : closeActions) { + sb.append("Close Action: " + close + "\n"); + } + for (EventDispatcher dispatcher : dispatchers) { + sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n"); + } + sb.append("Closed: ").append(closed).append("\n"); + sb.append("Reuse: ").append(reuse).append("\n"); + sb.append("Ordered: ").append(ordered).append("\n"); + sb.append("Started: ").append(started).append("\n"); + sb.append("Start Time: ").append(startTime).append("\n"); + sb.append("Start Nanos: ").append(startNanos).append("\n"); + return sb.toString(); + } + + private EventDispatcher[] getDispatchers() { + return dispatchers; + } + } + + final static class EventDispatcher { + final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; + + final private String eventName; + final private Consumer action; + + public EventDispatcher(Consumer action) { + this(null, action); + } + + public EventDispatcher(String eventName, Consumer action) { + this.eventName = eventName; + this.action = action; + } + + public void offer(RecordedEvent event) { + action.accept(event); + } + + public boolean accepts(EventType eventType) { + return (eventName == null || eventType.getName().equals(eventName)); + } + } + + public final static Instant NEXT_EVENT = Instant.now(); + public final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); + + private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; + private final AccessControlContext accessControlContext; + private final Thread thread; + + // Update bu updateConfiguration() + protected StreamConfiguration configuration = new StreamConfiguration(); + + // Cache the last event type and dispatch. + private EventType lastEventType; + private EventDispatcher[] lastEventDispatch; + + public AbstractEventStream(AccessControlContext acc) throws IOException { + this.accessControlContext = acc; + // Create thread object in constructor to ensure caller has permission + // permission before constructing object + thread = new Thread(this); + thread.setDaemon(true); + } + + public final void run() { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + execute(); + return null; + } + }, accessControlContext); + + } + + private void execute() { + // JVM.getJVM().exclude(Thread.currentThread()); + try { + updateStartNanos(); + process(); + } catch (IOException e) { + if (!isClosed()) { + logException(e); + } + } catch (Exception e) { + logException(e); + } finally { + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); + } + } + + // User setting overrides default + private void updateStartNanos() { + if (configuration.getStartTime() != null) { + StreamConfiguration c = new StreamConfiguration(configuration); + try { + c.setStartNanos(c.getStartTime().toEpochMilli() * 1_000_000L); + } catch (ArithmeticException ae) { + c.setStartNanos(Long.MAX_VALUE); + } + updateConfiguration(c); + } + } + + private void logException(Exception e) { + // FIXME: e.printStackTrace(); for debugging purposes, + // remove before before integration + e.printStackTrace(); + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); + } + + public abstract void process() throws IOException; + + protected final void clearLastDispatch() { + lastEventDispatch = null; + lastEventType = null; + } + + protected final void dispatch(RecordedEvent event) { + EventType type = event.getEventType(); + EventDispatcher[] ret = null; + if (type == lastEventType) { + ret = lastEventDispatch; + } else { + ret = configuration.dispatcherLookup.get(type.getId()); + if (ret == null) { + List list = new ArrayList<>(); + for (EventDispatcher e : configuration.getDispatchers()) { + if (e.accepts(type)) { + list.add(e); + } + } + ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); + configuration.dispatcherLookup.put(type.getId(), ret); + } + lastEventDispatch = ret; + } + for (int i = 0; i < ret.length; i++) { + try { + ret[i].offer(event); + } catch (Exception e) { + logException(e); + } + } + } + + public final void runCloseActions() { + Runnable[] cas = configuration.getCloseActions(); + for (int i = 0; i < cas.length; i++) { + try { + cas[i].run(); + } catch (Exception e) { + logException(e); + } + } + } + + public final void runFlushActions() { + Runnable[] fas = configuration.getFlushActions(); + for (int i = 0; i < fas.length; i++) { + try { + fas[i].run(); + } catch (Exception e) { + logException(e); + } + } + } + + // Purpose of synchronizing the following methods is + // to serialize changes to the configuration, so only one + // thread at a time can change the configuration. + // + // The purpose is not to guard the configuration field. A new + // configuration is published using updateConfiguration + // + public final synchronized boolean remove(Object action) { + return updateConfiguration(new StreamConfiguration(configuration).remove(action)); + } + + public final synchronized void onEvent(Consumer action) { + add(new EventDispatcher(action)); + } + + public final synchronized void onEvent(String eventName, Consumer action) { + add(new EventDispatcher(eventName, action)); + } + + private final synchronized void add(EventDispatcher e) { + updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); + } + + public final synchronized void onFlush(Runnable action) { + updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); + } + + public final synchronized void addCloseAction(Runnable action) { + updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); + } + + public final synchronized void setClosed(boolean closed) { + updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); + } + + public final synchronized void setReuse(boolean reuse) { + updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); + } + + public final synchronized void setOrdered(boolean ordered) { + updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); + } + + public final synchronized void setStartNanos(long startNanos) { + updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos)); + } + + public final synchronized void setStartTime(Instant startTime) { + Objects.nonNull(startTime); + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + if (startTime == null) { + return; + } + if (startTime.isBefore(Instant.EPOCH)) { + startTime = Instant.EPOCH; + } + updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); + } + + private boolean updateConfiguration(StreamConfiguration newConfiguration) { + // Changes to the configuration must be serialized, so make + // sure that we have the monitor + Thread.holdsLock(this); + if (newConfiguration.hasChanged()) { + // Publish objects indirectly held by new configuration object + VarHandle.releaseFence(); + configuration = newConfiguration; + // Publish the field reference. Making the field volatile + // would be an alternative, but it is repeatedly read. + VarHandle.releaseFence(); + return true; + } + return false; + } + + public final boolean isClosed() { + return configuration.isClosed(); + } + + public final void startAsync(long startNanos) { + synchronized (this) { + if (configuration.isStarted()) { + throw new IllegalStateException("Event stream can only be started once"); + } + StreamConfiguration c = new StreamConfiguration(configuration); + c.setStartNanos(startNanos); + c.setStarted(true); + updateConfiguration(c); + } + thread.start(); + } + + public final void start(long startNanos) { + synchronized (this) { + if (configuration.isStarted()) { + throw new IllegalStateException("Event stream can only be started once"); + } + StreamConfiguration c = new StreamConfiguration(configuration); + c.setStartNanos(startNanos); + c.setStarted(true); + updateConfiguration(c); + } + run(); + } + + public final void awaitTermination(Duration timeout) { + Objects.requireNonNull(timeout); + if (thread != Thread.currentThread()) { + try { + thread.join(timeout.toMillis()); + } catch (InterruptedException e) { + // ignore + } + } + } + + public final void awaitTermination() { + awaitTermination(Duration.ofMillis(0)); + } + + abstract public void close(); +} \ No newline at end of file diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Wed Jul 31 14:07:44 2019 +0200 @@ -121,6 +121,8 @@ /** * Reads an event and returns null when segment or chunk ends. + * + * @param awaitNewEvents wait for new data. */ public RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException { long absoluteChunkEnd = chunkHeader.getEnd(); @@ -192,7 +194,9 @@ } private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); + if (Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO)) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); + } while (true) { chunkHeader.refresh(); if (absoluteChunkEnd != chunkHeader.getEnd()) { @@ -209,7 +213,7 @@ long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart(); long lastCP = -1; long delta = -1; - boolean log = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE); + boolean logTrace = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE); while (thisCP != abortCP && delta != 0) { input.position(thisCP); lastCP = thisCP; @@ -226,9 +230,11 @@ int poolCount = input.readInt(); final long logLastCP = lastCP; final long logDelta = delta; - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> { - return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount; - }); + if (logTrace) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> { + return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount; + }); + } for (int i = 0; i < poolCount; i++) { long id = input.readLong(); // type id ConstantLookup lookup = constantLookups.get(id); @@ -251,7 +257,7 @@ if (count == 0) { throw new InternalError("Pool " + type.getName() + " must contain at least one element "); } - if (log) { + if (logTrace) { Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName()); } for (int j = 0; j < count; j++) { diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jul 31 14:07:44 2019 +0200 @@ -45,43 +45,52 @@ */ final class EventDirectoryStream implements EventStream { - static final class DirectoryConsumer extends EventConsumer { + static final class DirectoryStream extends AbstractEventStream { private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); private static final int DEFAULT_ARRAY_SIZE = 10_000; + private final RepositoryFiles repositoryFiles; + private ChunkParser chunkParser; private RecordedEvent[] sortedList; protected long chunkStartNanos; - public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException { + public DirectoryStream(AccessControlContext acc, Path p) throws IOException { super(acc); repositoryFiles = new RepositoryFiles(p); } @Override public void process() throws IOException { - chunkStartNanos = startNanos; + StreamConfiguration c1 = configuration; + chunkStartNanos = c1.getStartNanos(); Path path; - if (startTime == EventConsumer.NEXT_EVENT) { + if (c1.getStartTime() == AbstractEventStream.NEXT_EVENT) { // TODO: Need to skip forward to the next event // For now, use the last chunk. path = repositoryFiles.lastPath(); } else { path = repositoryFiles.nextPath(chunkStartNanos); } + if (path == null) { // closed + return; + } chunkStartNanos = repositoryFiles.getTimestamp(path) + 1; try (RecordingInput input = new RecordingInput(path.toFile())) { - chunkParser = new ChunkParser(input, this.reuse); + chunkParser = new ChunkParser(input, c1.getReuse()); while (!isClosed()) { boolean awaitnewEvent = false; while (!isClosed() && !chunkParser.isChunkFinished()) { - chunkParser.setReuse(this.reuse); - chunkParser.setOrdered(this.ordered); - chunkParser.setFirstNanos(startNanos); + final StreamConfiguration c2 = configuration; + boolean ordered = c2.getOrdered(); + chunkParser.setReuse(c2.getReuse()); + chunkParser.setOrdered(ordered); + chunkParser.setFirstNanos(c2.getStartNanos()); chunkParser.resetEventCache(); - chunkParser.setParserFilter(this.eventFilter); + chunkParser.setParserFilter(c2.getFilter()); chunkParser.updateEventParsers(); + clearLastDispatch(); if (ordered) { awaitnewEvent = processOrdered(awaitnewEvent); } else { @@ -101,6 +110,7 @@ } } + private boolean processOrdered(boolean awaitNewEvents) throws IOException { if (sortedList == null) { sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; @@ -139,39 +149,36 @@ while (true) { RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); if (e == null) { - awaitNewEvents = true; - break; + return true; } else { dispatch(e); } } - return awaitNewEvents; } @Override public void close() { + setClosed(true); repositoryFiles.close(); } } - private final EventConsumer eventConsumer; + private final AbstractEventStream eventStream; public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException { - eventConsumer = new DirectoryConsumer(acc, p); - eventConsumer.startTime = startTime; + eventStream = new DirectoryStream(acc, p); + eventStream.setStartTime(startTime); } + @Override public void close() { - eventConsumer.close(); + eventStream.close(); } + @Override public void onFlush(Runnable action) { Objects.requireNonNull(action); - eventConsumer.onFlush(action); - } - - void start(long startNanos) { - eventConsumer.start(startNanos); + eventStream.onFlush(action); } @Override @@ -184,58 +191,62 @@ startAsync(Instant.now().toEpochMilli() * 1000 * 1000L); } - void startAsync(long startNanos) { - eventConsumer.startAsync(startNanos); - } - @Override public void onEvent(Consumer action) { Objects.requireNonNull(action); - eventConsumer.onEvent(action); + eventStream.onEvent(action); } @Override public void onEvent(String eventName, Consumer action) { Objects.requireNonNull(eventName); Objects.requireNonNull(action); - eventConsumer.onEvent(eventName, action); + eventStream.onEvent(eventName, action); } @Override public void onClose(Runnable action) { Objects.requireNonNull(action); - eventConsumer.addCloseAction(action); + eventStream.addCloseAction(action); } @Override public boolean remove(Object action) { Objects.requireNonNull(action); - return eventConsumer.remove(action); + return eventStream.remove(action); } @Override public void awaitTermination(Duration timeout) { Objects.requireNonNull(timeout); - eventConsumer.awaitTermination(timeout); + eventStream.awaitTermination(timeout); } @Override public void awaitTermination() { - eventConsumer.awaitTermination(Duration.ofMillis(0)); + eventStream.awaitTermination(Duration.ofMillis(0)); } @Override public void setReuse(boolean reuse) { - eventConsumer.setReuse(reuse); + eventStream.setReuse(reuse); } @Override public void setOrdered(boolean ordered) { - eventConsumer.setOrdered(ordered); + eventStream.setOrdered(ordered); } @Override public void setStartTime(Instant startTime) { - eventConsumer.setStartTime(startTime); + eventStream.setStartTime(startTime); + } + + public void start(long startNanos) { + eventStream.start(startNanos); + } + + public void startAsync(long startNanos) { + eventStream.startAsync(startNanos); } } diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jul 31 14:07:44 2019 +0200 @@ -43,39 +43,41 @@ */ final class EventFileStream implements EventStream { - private final static class FileConsumer extends EventConsumer { + private final static class FileStream extends AbstractEventStream { private static final int DEFAULT_ARRAY_SIZE = 100_000; + private final RecordingInput input; + private ChunkParser chunkParser; - private boolean reuse = true; private RecordedEvent[] sortedList; - private boolean ordered; - public FileConsumer(AccessControlContext acc, RecordingInput input) throws IOException { + public FileStream(AccessControlContext acc, Path path) throws IOException { super(acc); - this.input = input; - } + this.input = new RecordingInput(path.toFile()); +; } @Override public void process() throws IOException { - chunkParser = new ChunkParser(input, reuse); + StreamConfiguration c1 = configuration; + chunkParser = new ChunkParser(input, c1.getReuse()); while (!isClosed()) { - boolean reuse = this.reuse; - boolean ordered = this.ordered; - chunkParser.setReuse(reuse); + StreamConfiguration c2 = configuration; + boolean ordered = c2.getOrdered(); + chunkParser.setReuse(c2.getReuse()); chunkParser.setOrdered(ordered); chunkParser.resetEventCache(); - chunkParser.setParserFilter(eventFilter); + chunkParser.setParserFilter(c2.getFiler()); chunkParser.updateEventParsers(); + clearLastDispatch(); if (ordered) { processOrdered(); } else { processUnordered(); } + runFlushActions(); if (chunkParser.isLastChunk()) { return; } - runFlushActions(); chunkParser = chunkParser.nextChunkParser(); } } @@ -115,106 +117,94 @@ } } - public void setReuse(boolean reuse) { - this.reuse = reuse; - } - - public void setOrdered(boolean ordered) { - this.ordered = ordered; - } - @Override public void close() { - + setClosed(true);; + runCloseActions(); + try { + input.close(); + } catch (IOException e) { + // ignore + } } - - } - private final RecordingInput input; - private final FileConsumer eventConsumer; + private final FileStream eventStream; public EventFileStream(Path path, Instant from, Instant to) throws IOException { Objects.requireNonNull(path); - input = new RecordingInput(path.toFile()); - eventConsumer = new FileConsumer(AccessController.getContext(), input); + eventStream = new FileStream(AccessController.getContext(), path); } @Override public void onEvent(Consumer action) { Objects.requireNonNull(action); - eventConsumer.onEvent(action); + eventStream.onEvent(action); } @Override public void onEvent(String eventName, Consumer action) { Objects.requireNonNull(eventName); Objects.requireNonNull(action); - eventConsumer.onEvent(eventName, action); + eventStream.onEvent(eventName, action); } @Override public void onFlush(Runnable action) { Objects.requireNonNull(action); - eventConsumer.onFlush(action); + eventStream.onFlush(action); } @Override public void onClose(Runnable action) { Objects.requireNonNull(action); - eventConsumer.addCloseAction(action); + eventStream.addCloseAction(action); } @Override public void close() { - eventConsumer.setClosed(true); - eventConsumer.runCloseActions(); - try { - input.close(); - } catch (IOException e) { - // ignore - } + eventStream.close(); } @Override public boolean remove(Object action) { Objects.requireNonNull(action); - return eventConsumer.remove(action); + return eventStream.remove(action); } @Override public void start() { - eventConsumer.start(0); + eventStream.start(0); } @Override public void setReuse(boolean reuse) { - eventConsumer.setReuse(reuse); + eventStream.setReuse(reuse); } @Override public void startAsync() { - eventConsumer.startAsync(0); + eventStream.startAsync(0); } @Override public void awaitTermination(Duration timeout) { Objects.requireNonNull(timeout); - eventConsumer.awaitTermination(timeout); + eventStream.awaitTermination(timeout); } @Override public void awaitTermination() { - eventConsumer.awaitTermination(); + eventStream.awaitTermination(); } @Override public void setOrdered(boolean ordered) { - eventConsumer.setOrdered(ordered); + eventStream.setOrdered(ordered); } @Override public void setStartTime(Instant startTime) { - eventConsumer.setStartTime(startTime); + eventStream.setStartTime(startTime); } } diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Wed Jul 31 14:07:44 2019 +0200 @@ -50,7 +50,7 @@ * during reading */ public static EventStream openRepository(Path directory) throws IOException { - return new EventDirectoryStream(AccessController.getContext(), directory, EventConsumer.NEXT_EVENT); + return new EventDirectoryStream(AccessController.getContext(), directory, AbstractEventStream.NEXT_EVENT); } /** diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Wed Jul 31 14:07:44 2019 +0200 @@ -360,7 +360,8 @@ } public static boolean exists(SafePath safePath) throws IOException { - return doPrivilegedIOWithReturn(() -> Files.exists(safePath.toPath())); + // Files.exist(path) is allocation intensive + return doPrivilegedIOWithReturn(() -> safePath.toPath().toFile().exists()); } public static boolean isDirectory(SafePath safePath) throws IOException { diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Fri Jul 12 15:04:28 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Wed Jul 31 14:07:44 2019 +0200 @@ -66,7 +66,7 @@ public Path nextPath(long startTimeNanos) { while (!closed) { if (startTimeNanos == -1) { - Entry e = pathSet.lastEntry(); + Entry e = pathSet.lastEntry(); if (e != null) { return e.getValue(); } @@ -106,17 +106,16 @@ boolean foundNew = false; List added = new ArrayList<>(); Set current = new HashSet<>(); - if (!Files.exists(repo)) { - // Repository removed, probably due to shutdown - return true; - } - try (DirectoryStream dirStream = Files.newDirectoryStream(repo, "*.jfr")) { + try (DirectoryStream dirStream = Files.newDirectoryStream(repo)) { for (Path p : dirStream) { if (!pathLookup.containsKey(p)) { - added.add(p); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath()); + String s = p.toString(); + if (s.endsWith(".jfr")) { + added.add(p); + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "New file found: " + p.toAbsolutePath()); + } + current.add(p); } - current.add(p); } } List removed = new ArrayList<>(); @@ -160,5 +159,4 @@ } } - } \ No newline at end of file diff -r 025c9b8eaefd -r 838f9a7635b6 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java Wed Jul 31 14:07:44 2019 +0200 @@ -0,0 +1,2 @@ +package jdk.jfr.internal.consumer; + diff -r 025c9b8eaefd -r 838f9a7635b6 test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnFlush.java --- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnFlush.java Fri Jul 12 15:04:28 2019 +0200 +++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestOnFlush.java Wed Jul 31 14:07:44 2019 +0200 @@ -43,12 +43,12 @@ } public static void main(String... args) throws Exception { - testOnFLushNull(); + testOnFlushNull(); testOneEvent(); testNoEvent(); } - private static void testOnFLushNull() { + private static void testOnFlushNull() { try (RecordingStream rs = new RecordingStream()) { try { rs.onFlush(null);