# HG changeset patch # User egahlin # Date 1567451321 -7200 # Node ID be121cbf32847ef91e872e49f9693e6835b3013e # Parent 269bbe414580161dab0f1871d5505984dc705dbd Clean up class hiercharchy diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -45,6 +45,7 @@ import jdk.jfr.internal.LogTag; import jdk.jfr.internal.Logger; import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.SecuritySupport; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.InternalEventFilter; @@ -59,18 +60,18 @@ * - security * */ -abstract class AbstractEventStream implements Runnable { +abstract class AbstractEventStream implements EventStream { - public static final class StreamConfiguration { + protected static final class StreamConfiguration { private static final Runnable[] NO_ACTIONS = new Runnable[0]; + private Consumer[] errorActions = new Consumer[0]; private Runnable[] flushActions = NO_ACTIONS; private Runnable[] closeActions = NO_ACTIONS; - private Runnable[] errorActions = NO_ACTIONS; - - private EventDispatcher[] dispatchers = NO_DISPATCHERS; + private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS; private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; private LongMap dispatcherLookup = new LongMap<>(); + private boolean changedConfiguration = false; private boolean closed = false; private boolean reuse = true; @@ -84,6 +85,7 @@ public StreamConfiguration(StreamConfiguration configuration) { this.flushActions = configuration.flushActions; this.closeActions = configuration.closeActions; + this.errorActions = configuration.errorActions; this.dispatchers = configuration.dispatchers; this.eventFilter = configuration.eventFilter; this.closed = configuration.closed; @@ -100,50 +102,50 @@ public StreamConfiguration() { } - final public StreamConfiguration remove(Object action) { + public StreamConfiguration remove(Object action) { flushActions = remove(flushActions, action); closeActions = remove(closeActions, action); dispatchers = removeDispatch(dispatchers, action); return this; } - final public StreamConfiguration addDispatcher(EventDispatcher e) { + public StreamConfiguration addDispatcher(EventDispatcher e) { dispatchers = add(dispatchers, e); eventFilter = buildFilter(dispatchers); dispatcherLookup = new LongMap<>(); return this; } - final public StreamConfiguration addFlushAction(Runnable action) { + public StreamConfiguration addFlushAction(Runnable action) { flushActions = add(flushActions, action); return this; } - final public StreamConfiguration addCloseAction(Runnable action) { + public StreamConfiguration addCloseAction(Runnable action) { closeActions = add(closeActions, action); return this; } - public StreamConfiguration addErrorAction(Runnable action) { + public StreamConfiguration addErrorAction(Consumer action) { errorActions = add(errorActions, action); return this; } - final public StreamConfiguration setClosed(boolean closed) { + public StreamConfiguration setClosed(boolean closed) { this.closed = closed; changedConfiguration = true; return this; } - final public boolean isClosed() { + public boolean isClosed() { return closed; } - final public Runnable[] getCloseActions() { + public Runnable[] getCloseActions() { return closeActions; } - final public Runnable[] getFlushActions() { + public Runnable[] getFlushActions() { return flushActions; } @@ -197,13 +199,13 @@ return ef; } - final public StreamConfiguration setReuse(boolean reuse) { + public StreamConfiguration setReuse(boolean reuse) { this.reuse = reuse; changedConfiguration = true; return this; } - final public StreamConfiguration setOrdered(boolean ordered) { + public StreamConfiguration setOrdered(boolean ordered) { this.ordered = ordered; changedConfiguration = true; return this; @@ -216,14 +218,14 @@ return this; } - final public StreamConfiguration setStartTime(Instant startTime) { + public StreamConfiguration setStartTime(Instant startTime) { this.startTime = startTime; this.startNanos = Utils.timeToNanos(startTime); changedConfiguration = true; return this; } - final public Instant getStartTime() { + public Instant getStartTime() { return startTime; } @@ -231,50 +233,50 @@ return endTime; } - final public boolean isStarted() { + public boolean isStarted() { return started; } - final public StreamConfiguration setStartNanos(long startNanos) { + public StreamConfiguration setStartNanos(long startNanos) { this.startNanos = startNanos; changedConfiguration = true; return this; } - final public void setStarted(boolean started) { + public void setStarted(boolean started) { this.started = started; changedConfiguration = true; } - final public boolean hasChanged() { + public boolean hasChanged() { return changedConfiguration; } - final public boolean getReuse() { + public boolean getReuse() { return reuse; } - final public boolean getOrdered() { + public boolean getOrdered() { return ordered; } - final public InternalEventFilter getFiler() { + public InternalEventFilter getFiler() { return eventFilter; } - final public long getStartNanos() { + public long getStartNanos() { return startNanos; } - final public long getEndNanos() { + public long getEndNanos() { return endNanos; } - final public InternalEventFilter getFilter() { + public InternalEventFilter getFilter() { return eventFilter; } - final public String toString() { + public String toString() { StringBuilder sb = new StringBuilder(); for (Runnable flush : flushActions) { sb.append("Flush Action: ").append(flush).append("\n"); @@ -282,6 +284,9 @@ for (Runnable close : closeActions) { sb.append("Close Action: " + close + "\n"); } + for (Consumer error : errorActions) { + sb.append("Error Action: " + error + "\n"); + } for (EventDispatcher dispatcher : dispatchers) { sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n"); } @@ -301,9 +306,8 @@ } } - final static class EventDispatcher { + private final static class EventDispatcher { final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - final private String eventName; final private Consumer action; @@ -325,13 +329,13 @@ } } - public final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); + final static Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); - private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; - private final AccessControlContext accessControlContext; private final Thread thread; private final boolean active; - protected final Runnable flushOperation = () -> runFlushActions(); + private final Runnable flushOperation = () -> runFlushActions(); + private final AccessControlContext accessControllerContext; + private final Object configurationLock = new Object(); // Modified by updateConfiguration() protected volatile StreamConfiguration configuration = new StreamConfiguration(); @@ -341,21 +345,231 @@ private EventDispatcher[] lastEventDispatch; public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { - this.accessControlContext = acc; + this.accessControllerContext = Objects.requireNonNull(acc); this.active = active; - // Create thread object in constructor to ensure caller has - // permission before constructing object - thread = new Thread(this); + this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc)); + } + + @Override + abstract public void start(); + + @Override + abstract public void startAsync(); + + @Override + abstract public void close(); + + // Purpose of synchronizing the following methods is + // to serialize changes to the configuration, so only one + // thread at a time can change the configuration. + // + // The purpose is not to guard the configuration field. A new + // configuration is published using updateConfiguration + // + @Override + public final void setOrdered(boolean ordered) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); + } + } + + @Override + public final void setReuse(boolean reuse) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); + } + } + + @Override + public final void setStartTime(Instant startTime) { + Objects.nonNull(startTime); + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + if (startTime.isBefore(Instant.EPOCH)) { + startTime = Instant.EPOCH; + } + updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); + } + } + + @Override + public final void setEndTime(Instant endTime) { + Objects.requireNonNull(endTime); + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Stream is already started"); + } + updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); + } + } + + @Override + public final void onEvent(Consumer action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + add(new EventDispatcher(action)); + } + } + + @Override + public final void onEvent(String eventName, Consumer action) { + Objects.requireNonNull(eventName); + Objects.requireNonNull(action); + synchronized (configurationLock) { + add(new EventDispatcher(eventName, action)); + } + } + + @Override + public final void onFlush(Runnable action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); + } + } + + @Override + public final void onClose(Runnable action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); + } + } + + @Override + public final void onError(Consumer action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); + } + } + + @Override + public final boolean remove(Object action) { + Objects.requireNonNull(action); + synchronized (configurationLock) { + return updateConfiguration(new StreamConfiguration(configuration).remove(action)); + } } - public final void run() { - AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Void run() { - execute(); - return null; + @Override + public final void awaitTermination() { + awaitTermination(Duration.ofMillis(0)); + } + + @Override + public final void awaitTermination(Duration timeout) { + Objects.requireNonNull(timeout); + if (thread != Thread.currentThread()) { + try { + thread.join(timeout.toMillis()); + } catch (InterruptedException e) { + // ignore + } + } + } + + protected abstract void process() throws Exception; + + protected final void clearLastDispatch() { + lastEventDispatch = null; + lastEventType = null; + } + + protected final void dispatch(RecordedEvent event) { + EventType type = event.getEventType(); + EventDispatcher[] dispatchers = null; + if (type == lastEventType) { + dispatchers = lastEventDispatch; + } else { + dispatchers = configuration.dispatcherLookup.get(type.getId()); + if (dispatchers == null) { + List list = new ArrayList<>(); + for (EventDispatcher e : configuration.getDispatchers()) { + if (e.accepts(type)) { + list.add(e); + } + } + dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); + configuration.dispatcherLookup.put(type.getId(), dispatchers); + } + lastEventDispatch = dispatchers; + } + for (int i = 0; i < dispatchers.length; i++) { + try { + dispatchers[i].offer(event); + } catch (Exception e) { + handleError(e); } - }, accessControlContext); + } + } + + protected final void runCloseActions() { + Runnable[] closeActions = configuration.getCloseActions(); + for (int i = 0; i < closeActions.length; i++) { + try { + closeActions[i].run(); + } catch (Exception e) { + handleError(e); + } + } + } + + protected final void setClosed(boolean closed) { + synchronized (configurationLock) { + updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); + } + } + + protected final boolean isClosed() { + return configuration.isClosed(); + } + + protected final void startAsync(long startNanos) { + startInternal(startNanos); + thread.start(); + } + + protected final void start(long startNanos) { + startInternal(startNanos); + run(accessControllerContext); + } + + protected final Runnable getFlushOperation() { + return flushOperation; + } + + private void add(EventDispatcher e) { + updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); + } + + private boolean updateConfiguration(StreamConfiguration newConfiguration) { + if (!Thread.holdsLock(configurationLock)) { + throw new InternalError("Modification of configuration without proper lock"); + } + if (newConfiguration.hasChanged()) { + // Publish objects held by configuration object + VarHandle.releaseFence(); + configuration = newConfiguration; + return true; + } + return false; + } + + private void startInternal(long startNanos) { + synchronized (configurationLock) { + if (configuration.isStarted()) { + throw new IllegalStateException("Event stream can only be started once"); + } + StreamConfiguration c = new StreamConfiguration(configuration); + if (active) { + c.setStartNanos(startNanos); + } + c.setStarted(true); + updateConfiguration(c); + } } private void execute() { @@ -369,202 +583,41 @@ } } - public abstract void process() throws Exception; - - protected final void clearLastDispatch() { - lastEventDispatch = null; - lastEventType = null; - } - - protected final void dispatch(RecordedEvent event) { - EventType type = event.getEventType(); - EventDispatcher[] ret = null; - if (type == lastEventType) { - ret = lastEventDispatch; - } else { - ret = configuration.dispatcherLookup.get(type.getId()); - if (ret == null) { - List list = new ArrayList<>(); - for (EventDispatcher e : configuration.getDispatchers()) { - if (e.accepts(type)) { - list.add(e); - } - } - ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]); - configuration.dispatcherLookup.put(type.getId(), ret); - } - lastEventDispatch = ret; + private void handleError(Throwable e) { + Consumer[] consumers = configuration.errorActions; + if (consumers.length == 0) { + defaultErrorHandler(e); + return; } - for (int i = 0; i < ret.length; i++) { - try { - ret[i].offer(event); - } catch (Exception e) { - handleError(e); - } + for (int i = 0; i < consumers.length; i++) { + @SuppressWarnings("unchecked") + Consumer c = (Consumer) consumers[i]; + c.accept(e); } } - protected final void handleError(Throwable e) { - StreamConfiguration c = configuration; - if (c.errorActions.length == 0) { - defaultErrorHandler(e); - return; - } - for (Runnable r : c.errorActions) { - r.run(); - } - } - - protected final void defaultErrorHandler(Throwable e) { + private void defaultErrorHandler(Throwable e) { e.printStackTrace(); } - public final void runCloseActions() { - Runnable[] cas = configuration.getCloseActions(); - for (int i = 0; i < cas.length; i++) { + private void runFlushActions() { + Runnable[] flushActions = configuration.getFlushActions(); + for (int i = 0; i < flushActions.length; i++) { try { - cas[i].run(); + flushActions[i].run(); } catch (Exception e) { handleError(e); } } } - public final void runFlushActions() { - Runnable[] fas = configuration.getFlushActions(); - for (int i = 0; i < fas.length; i++) { - try { - fas[i].run(); - } catch (Exception e) { - handleError(e); + private void run(AccessControlContext acc) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + execute(); + return null; } - } - - } - - // Purpose of synchronizing the following methods is - // to serialize changes to the configuration, so only one - // thread at a time can change the configuration. - // - // The purpose is not to guard the configuration field. A new - // configuration is published using updateConfiguration - // - public final synchronized boolean remove(Object action) { - return updateConfiguration(new StreamConfiguration(configuration).remove(action)); - } - - public final synchronized void onEvent(Consumer action) { - add(new EventDispatcher(action)); - } - - public final synchronized void onEvent(String eventName, Consumer action) { - add(new EventDispatcher(eventName, action)); - } - - private final synchronized void add(EventDispatcher e) { - updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e)); - } - - public final synchronized void onFlush(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action)); - } - - public final synchronized void addCloseAction(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action)); - } - - public final synchronized void addErrorAction(Runnable action) { - updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action)); - } - - public final synchronized void setClosed(boolean closed) { - updateConfiguration(new StreamConfiguration(configuration).setClosed(closed)); - } - - public final synchronized void setReuse(boolean reuse) { - updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse)); - } - - public final synchronized void setOrdered(boolean ordered) { - updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered)); - } - - public final synchronized void setStartNanos(long startNanos) { - updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos)); + }, acc); } - - public final synchronized void setStartTime(Instant startTime) { - Objects.nonNull(startTime); - if (configuration.isStarted()) { - throw new IllegalStateException("Stream is already started"); - } - if (startTime.isBefore(Instant.EPOCH)) { - startTime = Instant.EPOCH; - } - updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime)); - } - - public final synchronized void setEndTime(Instant endTime) { - if (configuration.isStarted()) { - throw new IllegalStateException("Stream is already started"); - } - updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime)); - } - - protected boolean updateConfiguration(StreamConfiguration newConfiguration) { - if (!Thread.holdsLock(this)) { - throw new InternalError("Modification of configuration without proper lock"); - } - if (newConfiguration.hasChanged()) { - // Publish objects held by configuration object - VarHandle.releaseFence(); - configuration = newConfiguration; - return true; - } - return false; - } - - public final boolean isClosed() { - return configuration.isClosed(); - } - - public final void startAsync(long startNanos) { - startInternal(startNanos); - thread.start(); - } - - public final void start(long startNanos) { - startInternal(startNanos); - run(); - } - - private synchronized void startInternal(long startNanos) { - if (configuration.isStarted()) { - throw new IllegalStateException("Event stream can only be started once"); - } - StreamConfiguration c = new StreamConfiguration(configuration); - if (active) { - c.setStartNanos(startNanos); - } - c.setStarted(true); - updateConfiguration(c); - } - - public final void awaitTermination(Duration timeout) { - Objects.requireNonNull(timeout); - if (thread != Thread.currentThread()) { - try { - thread.join(timeout.toMillis()); - } catch (InterruptedException e) { - // ignore - } - } - } - - public final void awaitTermination() { - awaitTermination(Duration.ofMillis(0)); - } - - abstract public void close(); - } \ No newline at end of file diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Mon Sep 02 21:08:41 2019 +0200 @@ -74,8 +74,8 @@ private boolean reuse; private boolean ordered; private boolean resetEventCache; - private long firstNanos = 0; - private long lastNanos = Long.MAX_VALUE; + private long filterStart = 0; + private long filterEnd = Long.MAX_VALUE; private Runnable flushOperation; public ChunkParser(RecordingInput input, boolean reuse) throws IOException { @@ -398,17 +398,17 @@ // Need to call updateEventParsers() for // change to take effect - public void setFirstNanos(long firstNanos) { + public void setFilterStart(long filterStart) { long chunkStart = chunkHeader.getStartNanos(); // Optimization. - if (firstNanos < chunkStart - 1_000_000_000L) { - firstNanos = 0; + if (filterStart < chunkStart - 1_000_000_000L) { + filterStart = 0; } - this.firstNanos = firstNanos; + this.filterStart = filterStart; } - public void setLastNanos(long lastNanos) { - this.lastNanos = lastNanos; + public void setFilterEnd(long filterEnd) { + this.filterEnd = filterEnd; } // Need to call updateEventParsers() for @@ -424,8 +424,8 @@ String name = ep.getEventType().getName(); ep.setOrdered(ordered); ep.setReuse(reuse); - ep.setFirstNanos(firstNanos); - ep.setLastNanos(lastNanos); + ep.setFilterStart(filterStart); + ep.setFilterEnd(filterEnd); if (resetEventCache) { ep.resetCache(); } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java Mon Sep 02 21:08:41 2019 +0200 @@ -38,5 +38,4 @@ public Object getCurrent(long key) { return current.get(key); } - } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -28,12 +28,9 @@ import java.io.IOException; import java.nio.file.Path; import java.security.AccessControlContext; -import java.time.Duration; import java.time.Instant; import java.util.Arrays; -import java.util.Comparator; import java.util.Objects; -import java.util.function.Consumer; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.FileAccess; @@ -45,156 +42,26 @@ * with chunk files. * */ -class EventDirectoryStream implements EventStream { - - static final class DirectoryStream extends AbstractEventStream { - - private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks); - private static final int DEFAULT_ARRAY_SIZE = 10_000; - - private final RepositoryFiles repositoryFiles; - private final boolean active; - private final FileAccess fileAccess; - private ChunkParser chunkParser; - private RecordedEvent[] sortedList; - protected long chunkStartNanos; - - public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { - super(acc, active); - this.fileAccess = fileAccess; - this.active = active; - repositoryFiles = new RepositoryFiles(fileAccess, p); - } - - @Override - public void process() throws Exception { - final StreamConfiguration c1 = configuration; - Path path; - boolean validStartTime = active || c1.getStartTime() != null; - if (validStartTime) { - path = repositoryFiles.firstPath(c1.getStartNanos()); - } else { - path = repositoryFiles.lastPath(); - } - if (path == null) { // closed - return; - } - chunkStartNanos = repositoryFiles.getTimestamp(path); - try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { - chunkParser = new ChunkParser(input, c1.getReuse()); - long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); - long start = validStartTime ? c1.getStartNanos() : segmentStart; - long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE; - while (!isClosed()) { - boolean awaitnewEvent = false; - while (!isClosed() && !chunkParser.isChunkFinished()) { - final StreamConfiguration c2 = configuration; - boolean ordered = c2.getOrdered(); - chunkParser.setFlushOperation(flushOperation); - chunkParser.setReuse(c2.getReuse()); - chunkParser.setOrdered(ordered); - chunkParser.setFirstNanos(start); - chunkParser.setLastNanos(end); - chunkParser.resetEventCache(); - chunkParser.setParserFilter(c2.getFilter()); - chunkParser.updateEventParsers(); - clearLastDispatch(); - if (ordered) { - awaitnewEvent = processOrdered(awaitnewEvent); - } else { - awaitnewEvent = processUnordered(awaitnewEvent); - } - if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > end) { - close(); - return; - } - } - +final class EventDirectoryStream extends AbstractEventStream { + private final RepositoryFiles repositoryFiles; + private final boolean active; + private final FileAccess fileAccess; + private ChunkParser chunkParser; + private long chunkStartNanos; + private RecordedEvent[] sortedList; - if (isClosed()) { - return; - } - long durationNanos = chunkParser.getChunkDuration(); - path = repositoryFiles.nextPath(chunkStartNanos + durationNanos); - if (path == null) { - return; // stream closed - } - chunkStartNanos = repositoryFiles.getTimestamp(path); - input.setFile(path); - chunkParser = chunkParser.newChunkParser(); - // No need filter when we reach new chunk - // start = 0; - } - } - } - - private boolean processOrdered(boolean awaitNewEvents) throws IOException { - if (sortedList == null) { - sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; - } - int index = 0; - while (true) { - RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); - if (e == null) { - // wait for new event with next call to - // readStreamingEvent() - awaitNewEvents = true; - break; - } - awaitNewEvents = false; - if (index == sortedList.length) { - sortedList = Arrays.copyOf(sortedList, sortedList.length * 2); - } - sortedList[index++] = e; - } - - // no events found - if (index == 0 && chunkParser.isChunkFinished()) { - return awaitNewEvents; - } - // at least 2 events, sort them - if (index > 1) { - Arrays.sort(sortedList, 0, index, END_TIME); - } - for (int i = 0; i < index; i++) { - dispatch(sortedList[i]); - } - return awaitNewEvents; - } - - private boolean processUnordered(boolean awaitNewEvents) throws IOException { - while (true) { - RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); - if (e == null) { - return true; - } else { - dispatch(e); - } - } - } - - @Override - public void close() { - setClosed(true); - repositoryFiles.close(); - } - } - - private final AbstractEventStream eventStream; - - public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException { - eventStream = new DirectoryStream(acc, p, access, active); + public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException { + super(acc, active); + this.fileAccess = Objects.requireNonNull(fileAccess); + this.active = active; + this.repositoryFiles = new RepositoryFiles(fileAccess, p); } @Override public void close() { - eventStream.close(); - } - - @Override - public void onFlush(Runnable action) { - Objects.requireNonNull(action); - eventStream.onFlush(action); + setClosed(true); + runCloseActions(); + repositoryFiles.close(); } @Override @@ -208,75 +75,108 @@ } @Override - public void onEvent(Consumer action) { - Objects.requireNonNull(action); - eventStream.onEvent(action); - } - - @Override - public void onEvent(String eventName, Consumer action) { - Objects.requireNonNull(eventName); - Objects.requireNonNull(action); - eventStream.onEvent(eventName, action); - } + public void process() throws Exception { + StreamConfiguration c = configuration; + Path path; + boolean validStartTime = active || c.getStartTime() != null; + if (validStartTime) { + path = repositoryFiles.firstPath(c.getStartNanos()); + } else { + path = repositoryFiles.lastPath(); + } + if (path == null) { // closed + return; + } + chunkStartNanos = repositoryFiles.getTimestamp(path); + try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { + chunkParser = new ChunkParser(input, c.getReuse()); + long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration(); + long filtertStart = validStartTime ? c.getStartNanos() : segmentStart; + long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE; + while (!isClosed()) { + boolean awaitnewEvent = false; + while (!isClosed() && !chunkParser.isChunkFinished()) { + c = configuration; + boolean ordered = c.getOrdered(); + chunkParser.setFlushOperation(getFlushOperation()); + chunkParser.setReuse(c.getReuse()); + chunkParser.setOrdered(ordered); + chunkParser.setFilterStart(filtertStart); + chunkParser.setFilterEnd(filterEnd); + chunkParser.resetEventCache(); + chunkParser.setParserFilter(c.getFilter()); + chunkParser.updateEventParsers(); + clearLastDispatch(); + if (ordered) { + awaitnewEvent = processOrdered(awaitnewEvent); + } else { + awaitnewEvent = processUnordered(awaitnewEvent); + } + if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) { + close(); + return; + } + } - @Override - public void onClose(Runnable action) { - Objects.requireNonNull(action); - eventStream.addCloseAction(action); - } - - @Override - public boolean remove(Object action) { - Objects.requireNonNull(action); - return eventStream.remove(action); - } - - @Override - public void awaitTermination(Duration timeout) { - Objects.requireNonNull(timeout); - eventStream.awaitTermination(timeout); - } - - @Override - public void awaitTermination() { - eventStream.awaitTermination(Duration.ofMillis(0)); + if (isClosed()) { + return; + } + long durationNanos = chunkParser.getChunkDuration(); + path = repositoryFiles.nextPath(chunkStartNanos + durationNanos); + if (path == null) { + return; // stream closed + } + chunkStartNanos = repositoryFiles.getTimestamp(path); + input.setFile(path); + chunkParser = chunkParser.newChunkParser(); + // TODO: Optimization. No need filter when we reach new chunk + // Could set start = 0; + } + } } - @Override - public void setReuse(boolean reuse) { - eventStream.setReuse(reuse); - } + private boolean processOrdered(boolean awaitNewEvents) throws IOException { + if (sortedList == null) { + sortedList = new RecordedEvent[100_000]; + } + int index = 0; + while (true) { + RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); + if (e == null) { + // wait for new event with next call to + // readStreamingEvent() + awaitNewEvents = true; + break; + } + awaitNewEvents = false; + if (index == sortedList.length) { + sortedList = Arrays.copyOf(sortedList, sortedList.length * 2); + } + sortedList[index++] = e; + } - @Override - public void setOrdered(boolean ordered) { - eventStream.setOrdered(ordered); - } - - @Override - public void setStartTime(Instant startTime) { - eventStream.setStartTime(startTime); + // no events found + if (index == 0 && chunkParser.isChunkFinished()) { + return awaitNewEvents; + } + // at least 2 events, sort them + if (index > 1) { + Arrays.sort(sortedList, 0, index, END_TIME); + } + for (int i = 0; i < index; i++) { + dispatch(sortedList[i]); + } + return awaitNewEvents; } - @Override - public void setEndTime(Instant endTime) { - eventStream.setEndTime(endTime); - } - - - public void start(long startNanos) { - eventStream.start(startNanos); + private boolean processUnordered(boolean awaitNewEvents) throws IOException { + while (true) { + RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents); + if (e == null) { + return true; + } else { + dispatch(e); + } + } } - - public void startAsync(long startNanos) { - eventStream.startAsync(startNanos); - } - - @Override - public void onError(Consumer action) { - // TODO Auto-generated method stub - - } - - } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -28,12 +28,8 @@ import java.io.IOException; import java.nio.file.Path; import java.security.AccessControlContext; -import java.security.AccessController; -import java.time.Duration; -import java.time.Instant; import java.util.Arrays; import java.util.Objects; -import java.util.function.Consumer; import jdk.jfr.internal.consumer.FileAccess; import jdk.jfr.internal.consumer.RecordingInput; @@ -42,196 +38,110 @@ * Implementation of an event stream that operates against a recording file. * */ -final class EventFileStream implements EventStream { - - private final static class FileStream extends AbstractEventStream { - private static final int DEFAULT_ARRAY_SIZE = 100_000; - - private final RecordingInput input; - - private ChunkParser chunkParser; - private RecordedEvent[] sortedList; - - public FileStream(AccessControlContext acc, Path path) throws IOException { - super(acc, false); - this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED); -; } - - @Override - public void process() throws IOException { - final StreamConfiguration c1 = configuration; - long start = 0; - long end = Long.MAX_VALUE; - if (c1.getStartTime() != null) { - start = c1.getStartNanos(); - } - if (c1.getEndTime() != null) { - end = c1.getEndNanos(); - } +final class EventFileStream extends AbstractEventStream { + private final RecordingInput input; + private ChunkParser chunkParser; + private RecordedEvent[] sortedList; - chunkParser = new ChunkParser(input, c1.getReuse()); - while (!isClosed()) { - if (chunkParser.getStartNanos() > end) { - close(); - return; - } - StreamConfiguration c2 = configuration; - boolean ordered = c2.getOrdered(); - chunkParser.setFlushOperation(flushOperation); - chunkParser.setFirstNanos(start); - chunkParser.setLastNanos(end); - chunkParser.setReuse(c2.getReuse()); - chunkParser.setOrdered(ordered); - chunkParser.resetEventCache(); - chunkParser.setParserFilter(c2.getFiler()); - chunkParser.updateEventParsers(); - clearLastDispatch(); - if (ordered) { - processOrdered(); - } else { - processUnordered(); - } - if (chunkParser.isLastChunk()) { - return; - } - chunkParser = chunkParser.nextChunkParser(); - } - } - - private void processOrdered() throws IOException { - if (sortedList == null) { - sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE]; - } - RecordedEvent event; - int index = 0; - while (true) { - event = chunkParser.readEvent(); - if (event == null) { - Arrays.sort(sortedList, 0, index, END_TIME); - for (int i = 0; i < index; i++) { - dispatch(sortedList[i]); - } - return; - } - if (index == sortedList.length) { - RecordedEvent[] tmp = sortedList; - sortedList = new RecordedEvent[2 * tmp.length]; - System.arraycopy(tmp, 0, sortedList, 0, tmp.length); - } - sortedList[index++] = event; - } - } - - private void processUnordered() throws IOException { - RecordedEvent event; - while (!isClosed()) { - event = chunkParser.readEvent(); - if (event == null) { - return; - } - dispatch(event); - } - } - - @Override - public void close() { - setClosed(true);; - runCloseActions(); - try { - input.close(); - } catch (IOException e) { - // ignore - } - } - } - - private final FileStream eventStream; - - public EventFileStream(Path path, Instant from, Instant to) throws IOException { + public EventFileStream(AccessControlContext acc, Path path) throws IOException { + super(acc, false); Objects.requireNonNull(path); - eventStream = new FileStream(AccessController.getContext(), path); + this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED); } @Override - public void onEvent(Consumer action) { - Objects.requireNonNull(action); - eventStream.onEvent(action); + public void start() { + start(0); } @Override - public void onEvent(String eventName, Consumer action) { - Objects.requireNonNull(eventName); - Objects.requireNonNull(action); - eventStream.onEvent(eventName, action); - } - - @Override - public void onFlush(Runnable action) { - Objects.requireNonNull(action); - eventStream.onFlush(action); - } - - @Override - public void onClose(Runnable action) { - Objects.requireNonNull(action); - eventStream.addCloseAction(action); + public void startAsync() { + startAsync(0); } @Override public void close() { - eventStream.close(); - } - - @Override - public boolean remove(Object action) { - Objects.requireNonNull(action); - return eventStream.remove(action); - } - - @Override - public void start() { - eventStream.start(0); - } - - @Override - public void setReuse(boolean reuse) { - eventStream.setReuse(reuse); - } - - @Override - public void startAsync() { - eventStream.startAsync(0); + setClosed(true); + runCloseActions(); + try { + input.close(); + } catch (IOException e) { + // ignore + } } @Override - public void awaitTermination(Duration timeout) { - Objects.requireNonNull(timeout); - eventStream.awaitTermination(timeout); - } + public void process() throws IOException { + StreamConfiguration c = configuration; + long start = 0; + long end = Long.MAX_VALUE; + if (c.getStartTime() != null) { + start = c.getStartNanos(); + } + if (c.getEndTime() != null) { + end = c.getEndNanos(); + } - @Override - public void awaitTermination() { - eventStream.awaitTermination(); - } - - @Override - public void setOrdered(boolean ordered) { - eventStream.setOrdered(ordered); + chunkParser = new ChunkParser(input, c.getReuse()); + while (!isClosed()) { + if (chunkParser.getStartNanos() > end) { + close(); + return; + } + c = configuration; + boolean ordered = c.getOrdered(); + chunkParser.setFlushOperation(getFlushOperation()); + chunkParser.setFilterStart(start); + chunkParser.setFilterEnd(end); + chunkParser.setReuse(c.getReuse()); + chunkParser.setOrdered(ordered); + chunkParser.resetEventCache(); + chunkParser.setParserFilter(c.getFiler()); + chunkParser.updateEventParsers(); + clearLastDispatch(); + if (ordered) { + processOrdered(); + } else { + processUnordered(); + } + if (chunkParser.isLastChunk()) { + return; + } + chunkParser = chunkParser.nextChunkParser(); + } } - @Override - public void setStartTime(Instant startTime) { - eventStream.setStartTime(startTime); + private void processOrdered() throws IOException { + if (sortedList == null) { + sortedList = new RecordedEvent[10_000]; + } + RecordedEvent event; + int index = 0; + while (true) { + event = chunkParser.readEvent(); + if (event == null) { + Arrays.sort(sortedList, 0, index, END_TIME); + for (int i = 0; i < index; i++) { + dispatch(sortedList[i]); + } + return; + } + if (index == sortedList.length) { + RecordedEvent[] tmp = sortedList; + sortedList = new RecordedEvent[2 * tmp.length]; + System.arraycopy(tmp, 0, sortedList, 0, tmp.length); + } + sortedList[index++] = event; + } } - @Override - public void setEndTime(Instant endTime) { - eventStream.setEndTime(endTime); - } - - @Override - public void onError(Consumer action) { - // TODO Auto-generated method stub - + private void processUnordered() throws IOException { + while (!isClosed()) { + RecordedEvent event = chunkParser.readEvent(); + if (event == null) { + return; + } + dispatch(event); + } } } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Mon Sep 02 21:08:41 2019 +0200 @@ -79,5 +79,4 @@ Duration getThreshold() { return threshold; } - } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Mon Sep 02 21:08:41 2019 +0200 @@ -174,12 +174,12 @@ } } - public void setFirstNanos(long firstNanos) { - this.filterStart = firstNanos; + public void setFilterStart(long filterStart) { + this.filterStart = filterStart; } - public void setLastNanos(long lastNanos) { - this.filterEnd = lastNanos; + public void setFilterEnd(long filterEnd) { + this.filterEnd = filterEnd; } public void setOrdered(boolean ordered) { diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -103,7 +103,7 @@ * {@code checkRead} method denies read access to the file */ public static EventStream openFile(Path file) throws IOException { - return new EventFileStream(file, null, null); + return new EventFileStream(AccessController.getContext(), file); } /** diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Mon Sep 02 21:08:41 2019 +0200 @@ -47,13 +47,14 @@ * A recording stream produces events from the current JVM (Java Virtual * Machine). *

- * The following example records events using the default configuration and - * prints the Garbage Collection, CPU Load and JVM Information event. + * The following example, shows how to record events using the default + * configuration and print the Garbage Collection, CPU Load and JVM Information + * event to standard out. * *

  * 
- * var c = Configuration.getConfiguration("default");
- * try (var rs = new RecordingStream(c)) {
+ * Configuration c = Configuration.getConfiguration("default");
+ * try (RecordingStream rs = new RecordingStream(c)) {
  *     rs.onEvent("jdk.GarbageCollection", System.out::println);
  *     rs.onEvent("jdk.CPULoad", System.out::println);
  *     rs.onEvent("jdk.JVMInformation", System.out::println);
@@ -67,24 +68,10 @@
 public final class RecordingStream implements AutoCloseable, EventStream {
 
     private final Recording recording;
-    private final EventDirectoryStream stream;
+    private final EventDirectoryStream directoryStream;
 
     /**
      * Creates an event stream for the current JVM (Java Virtual Machine).
-     * 

- * The following example shows how to create a recording stream that prints - * CPU usage and information about garbage collections. - * - *

-     * 
-     * try (var rs = new RecordingStream()) {
-     *   rs.enable("jdk.GarbageCollection");
-     *   rs.enable("jdk.CPULoad").withPeriod(Duration.ofSeconds(1));
-     *   rs.onEvent(System.out::println);
-     *   rs.start();
-     * }
-     * 
-     * 
* * @throws IllegalStateException if Flight Recorder can't be created (for * example, if the Java Virtual Machine (JVM) lacks Flight Recorder @@ -100,8 +87,9 @@ this.recording = new Recording(); this.recording.setFlushInterval(Duration.ofMillis(1000)); try { - this.stream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILIGED, true); + this.directoryStream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILIGED, true); } catch (IOException ioe) { + this.recording.close(); throw new IllegalStateException(ioe.getMessage()); } } @@ -274,56 +262,6 @@ recording.setMaxSize(maxSize); } - @Override - public void onEvent(String eventName, Consumer action) { - stream.onEvent(eventName, action); - } - - @Override - public void onEvent(Consumer action) { - stream.onEvent(action); - } - - @Override - public void onFlush(Runnable action) { - stream.onFlush(action); - } - - @Override - public void onClose(Runnable action) { - stream.onClose(action); - } - - @Override - public void close() { - recording.close(); - stream.close(); - } - - @Override - public boolean remove(Object action) { - return stream.remove(action); - } - - @Override - public void start() { - PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording); - long startNanos = pr.start(); - stream.start(startNanos); - } - - @Override - public void startAsync() { - PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording); - long startNanos = pr.start(); - stream.startAsync(startNanos); - } - - @Override - public void awaitTermination(Duration timeout) { - stream.awaitTermination(timeout); - } - /** * Determines how often events are made available for streaming. * @@ -339,32 +277,82 @@ } @Override - public void awaitTermination() { - stream.awaitTermination(); - } - - @Override public void setReuse(boolean reuse) { - stream.setReuse(reuse); + directoryStream.setReuse(reuse); } @Override public void setOrdered(boolean ordered) { - stream.setOrdered(ordered); + directoryStream.setOrdered(ordered); } @Override public void setStartTime(Instant startTime) { - stream.setStartTime(startTime); + directoryStream.setStartTime(startTime); } @Override public void setEndTime(Instant endTime) { - stream.setStartTime(endTime); + directoryStream.setStartTime(endTime); + } + + @Override + public void onEvent(String eventName, Consumer action) { + directoryStream.onEvent(eventName, action); + } + + @Override + public void onEvent(Consumer action) { + directoryStream.onEvent(action); + } + + @Override + public void onFlush(Runnable action) { + directoryStream.onFlush(action); + } + + @Override + public void onClose(Runnable action) { + directoryStream.onClose(action); } @Override public void onError(Consumer action) { - stream.onError(action); + directoryStream.onError(action); + } + + @Override + public void close() { + recording.close(); + directoryStream.close(); + } + + @Override + public boolean remove(Object action) { + return directoryStream.remove(action); + } + + @Override + public void start() { + PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording); + long startNanos = pr.start(); + directoryStream.start(startNanos); + } + + @Override + public void startAsync() { + PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording); + long startNanos = pr.start(); + directoryStream.startAsync(startNanos); + } + + @Override + public void awaitTermination(Duration timeout) { + directoryStream.awaitTermination(timeout); + } + + @Override + public void awaitTermination() { + directoryStream.awaitTermination(); } } diff -r 269bbe414580 -r be121cbf3284 src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Mon Sep 02 21:03:40 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java Mon Sep 02 21:08:41 2019 +0200 @@ -436,7 +436,7 @@ }); } - static Thread createThreadWitNoPermissions(String threadName, Runnable runnable) { + public static Thread createThreadWitNoPermissions(String threadName, Runnable runnable) { return doPrivilegedWithReturn(() -> new Thread(runnable, threadName), new Permission[0]); }