# HG changeset patch # User egahlin # Date 1558719571 -7200 # Node ID 50ca040843ea882e8a9c3a4af0ec6428c3288a0c # Parent 29635339ef621bb555c7356292c253d9e1134779 Prepare infrastructure for multiple implementations of EventStream diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 24 19:39:31 2019 +0200 @@ -38,6 +38,7 @@ import jdk.jfr.internal.Type; import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.ChunkHeader; +import jdk.jfr.internal.consumer.InternalEventFilter; import jdk.jfr.internal.consumer.Parser; import jdk.jfr.internal.consumer.RecordingInput; diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 24 19:39:31 2019 +0200 @@ -32,141 +32,144 @@ import java.util.Objects; import java.util.function.Consumer; -import jdk.jfr.EventType; +import jdk.jfr.internal.consumer.EventConsumer; final class EventDirectoryStream implements EventStream { - public final static class EventConsumer { - final private String eventName; - final Consumer action; + private static class EventRunner extends EventConsumer { + private EventSetLocation location; + private EventSet eventSet; + private int eventSetIndex; + private int eventArrayIndex; + private RecordedEvent[] currentEventArray = new RecordedEvent[0]; - EventConsumer(String eventName, Consumer eventConsumer) { - this.eventName = eventName; - this.action = eventConsumer; + public EventRunner(AccessControlContext acc) throws IOException { + super(acc); } - public void offer(RecordedEvent event) { - action.accept(event); + public void process() throws Exception, IOException { + this.location = EventSetLocation.current(); + this.eventSet = location.acquire(startNanos, null); // use timestamp + // from + if (eventSet == null) { + return; + } + while (!isClosed()) { + processSegment(); + runFlushActions(); + do { + if (isClosed()) { + return; + } + currentEventArray = eventSet.readEvents(eventSetIndex); + if (currentEventArray == EventSet.END_OF_SET) { + eventSet = eventSet.next(eventFilter); + if (eventSet == null || isClosed()) { + return; + } + eventSetIndex = 0; + continue; + } + if (currentEventArray == null) { + return; // no more events + } + eventSetIndex++; + } while (currentEventArray.length == 0); + eventArrayIndex = 0; + } } - public boolean accepts(EventType eventType) { - return (eventName == null || eventType.getName().equals(eventName)); + private void processSegment() { + while (eventArrayIndex < currentEventArray.length) { + RecordedEvent e = currentEventArray[eventArrayIndex++]; + if (e == null) { + return; + } + dispatch(e); + } + } + + public void close() { + setClosed(true); + // TODO: Data races here, must fix + synchronized (this) { + if (eventSet != null) { + eventSet.release(null); + } + if (location != null) { + location.release(); + } + } + runCloseActions(); } } - private final EventRunner eventRunner; - private Thread thread; - private boolean started; + private final EventRunner eventConsumer; public EventDirectoryStream(AccessControlContext acc) throws IOException { - eventRunner = new EventRunner(acc); + eventConsumer = new EventRunner(acc); } public void close() { - synchronized (eventRunner) { - eventRunner.close(); - } + eventConsumer.close(); } - public synchronized void onFlush(Runnable action) { + public void onFlush(Runnable action) { Objects.requireNonNull(action); - synchronized (eventRunner) { - this.eventRunner.addFlush(action); - } + eventConsumer.onFlush(action); } void start(long startNanos) { - synchronized (eventRunner) { - if (started) { - throw new IllegalStateException("Event stream can only be started once"); - } - started = true; - eventRunner.setStartNanos(startNanos); - } - eventRunner.run(); + eventConsumer.start(startNanos); } @Override public void start() { - start(Instant.now().toEpochMilli() * 1000*1000L); + start(Instant.now().toEpochMilli() * 1000 * 1000L); } @Override public void startAsync() { - startAsync(Instant.now().toEpochMilli() * 1000*1000L); + startAsync(Instant.now().toEpochMilli() * 1000 * 1000L); } void startAsync(long startNanos) { - synchronized (eventRunner) { - eventRunner.setStartNanos(startNanos); - thread = new Thread(eventRunner); - thread.setDaemon(true); - thread.start(); - } + eventConsumer.startAsync(startNanos); } - public void addEventConsumer(EventConsumer action) { - Objects.requireNonNull(action); - synchronized (eventRunner) { - eventRunner.add(action); - } - } - - - @Override public void onEvent(Consumer action) { Objects.requireNonNull(action); - synchronized (eventRunner) { - eventRunner.add(new EventConsumer(null, action)); - } + eventConsumer.onEvent(action); } @Override public void onEvent(String eventName, Consumer action) { Objects.requireNonNull(eventName); Objects.requireNonNull(action); - synchronized (eventRunner) { - eventRunner.add(new EventConsumer(eventName, action)); - } + eventConsumer.onEvent(eventName, action); } @Override public void onClose(Runnable action) { Objects.requireNonNull(action); - synchronized (eventRunner) { - eventRunner.addCloseAction(action); - } + eventConsumer.addCloseAction(action); } @Override public boolean remove(Object action) { Objects.requireNonNull(action); - synchronized (eventRunner) { - return eventRunner.remove(action); - } + return eventConsumer.remove(action); } @Override public void awaitTermination(Duration timeout) { Objects.requireNonNull(timeout); - Thread t = null; - synchronized (eventRunner) { - t = thread; - } - if (t != null && t != Thread.currentThread()) { - try { - t.join(timeout.toMillis()); - } catch (InterruptedException e) { - // ignore - } - } + eventConsumer.awaitTermination(timeout); } @Override public void awaitTermination() { - awaitTermination(Duration.ofMillis(0)); + eventConsumer.awaitTermination(Duration.ofMillis(0)); } - - } diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 24 19:39:31 2019 +0200 @@ -25,85 +25,111 @@ package jdk.jfr.consumer; +import java.io.IOException; import java.nio.file.Path; +import java.security.AccessControlContext; +import java.security.AccessController; import java.time.Duration; import java.util.Objects; import java.util.function.Consumer; +import jdk.jfr.internal.consumer.EventConsumer; +import jdk.jfr.internal.consumer.RecordingInput; + /** * Implementation of an event stream that operates against a recording file. * */ final class EventFileStream implements EventStream { - public EventFileStream(Path path) { + final static class FileEventConsumer extends EventConsumer { + private final RecordingInput input; + + public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException { + super(acc); + this.input = input; + } + + @Override + public void process() throws Exception { + // TODO This need more work; filter, multiple chunk etc + ChunkParser cp = new ChunkParser(input); + while (true) { + RecordedEvent e = cp.readEvent(); + dispatch(e); + } + } + } + + private final RecordingInput input; + private final FileEventConsumer eventConsumer; + + public EventFileStream(Path path) throws IOException { Objects.requireNonNull(path); + input = new RecordingInput(path.toFile()); + eventConsumer = new FileEventConsumer(AccessController.getContext(), input); } @Override public void onEvent(Consumer action) { Objects.requireNonNull(action); - notImplemented(); - } - - public void onEvent(EventFilter filter, Consumer action) { - Objects.requireNonNull(filter); - Objects.requireNonNull(action); - notImplemented(); + eventConsumer.onEvent(action); } @Override public void onEvent(String eventName, Consumer action) { Objects.requireNonNull(eventName); Objects.requireNonNull(action); - notImplemented(); + eventConsumer.onEvent(eventName, action); } @Override public void onFlush(Runnable action) { Objects.requireNonNull(action); - notImplemented(); + eventConsumer.onFlush(action); } @Override public void onClose(Runnable action) { Objects.requireNonNull(action); - notImplemented(); + eventConsumer.addCloseAction(action); } @Override public void close() { - notImplemented(); + eventConsumer.setClosed(true); + eventConsumer.runCloseActions(); + try { + input.close(); + } catch (IOException e) { + // ignore + } } @Override public boolean remove(Object action) { Objects.requireNonNull(action); - notImplemented(); - return false; + return eventConsumer.remove(action); } @Override public void start() { - notImplemented(); + eventConsumer.start(0); } @Override public void startAsync() { - notImplemented(); + eventConsumer.startAsync(0); } @Override public void awaitTermination(Duration timeout) { Objects.requireNonNull(timeout); + eventConsumer.awaitTermination(timeout); } @Override public void awaitTermination() { - notImplemented(); - } - - private static void notImplemented() { - throw new UnsupportedOperationException("Streaming for files not yet implemenetd"); + eventConsumer.awaitTermination(); } } diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Fri May 24 19:39:31 2019 +0200 @@ -26,7 +26,6 @@ package jdk.jfr.consumer; import java.time.Duration; -import java.time.Instant; import java.util.Arrays; import java.util.List; @@ -45,31 +44,25 @@ return new EventFilter(eventNames.clone(), null, new String[0]); } - public EventFilter threshold(Duration threshold) { + public EventFilter aboveThreshold(Duration threshold) { return new EventFilter(eventNames, threshold, fields); } - public EventFilter fields(String... fieldNames) { + public EventFilter mustHaveFields(String... fieldNames) { return new EventFilter(eventNames, threshold, fieldNames); } - public EventFilter start(Instant instant) { + + + public EventFilter onlyThreads(Thread... t) { return this; } - public EventFilter end(Instant instant) { + public EventFilter onlyThreadIds(long... threadId) { return this; } - public EventFilter threads(Thread... t) { - return this; - } - - public EventFilter threadIds(long... threadId) { - return this; - } - - public EventFilter threadNames(String... threadName) { + public EventFilter onlyThreadNames(String... threadName) { return this; } diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventRunner.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventRunner.java Tue May 21 22:59:47 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,266 +0,0 @@ -/* - * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.jfr.consumer; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; -import java.security.AccessControlContext; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import jdk.jfr.consumer.EventDirectoryStream.EventConsumer; -import jdk.jfr.internal.JVM; -import jdk.jfr.internal.LogLevel; -import jdk.jfr.internal.LogTag; -import jdk.jfr.internal.Logger; - -class EventRunner implements Runnable { - private final static VarHandle closedHandle; - private final static VarHandle consumersHandle; - private final static VarHandle dispatcherHandle; - private final static VarHandle flushActionsHandle; - private final static VarHandle closeActionsHandle; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - closedHandle = l.findVarHandle(EventRunner.class, "closed", boolean.class); - consumersHandle = l.findVarHandle(EventRunner.class, "consumers", EventConsumer[].class); - dispatcherHandle = l.findVarHandle(EventRunner.class, "dispatcher", LongMap.class); - flushActionsHandle = l.findVarHandle(EventRunner.class, "flushActions", Runnable[].class); - closeActionsHandle = l.findVarHandle(EventRunner.class, "closeActions", Runnable[].class); - } catch (ReflectiveOperationException e) { - throw new InternalError(e); - } - } - // set by VarHandle - private boolean closed; - // set by VarHandle - private EventConsumer[] consumers = new EventConsumer[0]; - // set by VarHandle - private LongMap dispatcher = new LongMap<>(); - // set by VarHandle - private Runnable[] flushActions = new Runnable[0]; - // set by VarHandle - private Runnable[] closeActions = new Runnable[0]; - - private final static JVM jvm = JVM.getJVM(); - private final static EventConsumer[] NO_CONSUMERS = new EventConsumer[0]; - private final AccessControlContext accessControlContext; - private EventSetLocation location; - private EventSet eventSet; - private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; - private int eventSetIndex; - private int eventArrayIndex; - private RecordedEvent[] currentEventArray = new RecordedEvent[0]; - private volatile long startNanos; - - public EventRunner(AccessControlContext acc) throws IOException { - this.accessControlContext = acc; - } - - public void run() { - doPriviliged(() -> execute()); - } - - void doPriviliged(Runnable r) { - AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Void run() { - r.run(); - return null; - } - }, accessControlContext); - } - - private void execute() { - jvm.exclude(Thread.currentThread()); - try { - process(); - } catch (Throwable e) { - e.printStackTrace(); - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer."); - } finally { - Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); - } - } - - private void process() throws Exception, IOException { - this.location = EventSetLocation.current(); - this.eventSet = location.acquire(startNanos, null); // use timestamp from - if (eventSet == null) { - return; - } - while (!closed) { - processSegment(); - Runnable[] fas = this.flushActions; - for (int i = 0; i < fas.length; i++) { - fas[i].run(); - } - do { - if (closed) { - return; - } - currentEventArray = eventSet.readEvents(eventSetIndex); - if (currentEventArray == EventSet.END_OF_SET) { - eventSet = eventSet.next(eventFilter); - if (eventSet == null || closed) { - return; - } - eventSetIndex = 0; - continue; - } - if (currentEventArray == null) { - return; // no more events - } - eventSetIndex++; - } while (currentEventArray.length == 0); - eventArrayIndex = 0; - } - } - - private void processSegment() { - while (eventArrayIndex < currentEventArray.length) { - RecordedEvent e = currentEventArray[eventArrayIndex++]; - if (e == null) { - return; - } - EventConsumer[] consumerDispatch = dispatcher.get(e.getEventType().getId()); - if (consumerDispatch == null) { - consumerDispatch = NO_CONSUMERS; - for (EventConsumer ec : consumers.clone()) { - if (ec.accepts(e.getEventType())) { - consumerDispatch = merge(consumerDispatch, ec); - } - } - dispatcher.put(e.getEventType().getId(), consumerDispatch); - } - for (int i = 0; i < consumerDispatch.length; i++) { - consumerDispatch[i].offer(e); - } - } - } - - static EventConsumer[] merge(EventConsumer[] current, EventConsumer add) { - EventConsumer[] array = new EventConsumer[current.length + 1]; - System.arraycopy(current, 0, array, 0, current.length); - array[current.length] = add; - return array; - } - - public void add(EventConsumer e) { - consumersHandle.setVolatile(this, merge(consumers, e)); - dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset - } - - private static Runnable[] removeAction(Runnable[] array, Object action) { - if (array.length == 0) { - return null; - } - boolean remove = false; - List list = new ArrayList<>(); - for (int i = 0; i < array.length; i++) { - if (array[i] != action) { - list.add(array[i]); - } else { - remove = true; - } - } - if (remove) { - return list.toArray(new Runnable[list.size()]); - } - return null; - } - - private static Runnable[] addAction(Runnable[] array, Runnable action) { - ArrayList a = new ArrayList<>(); - a.addAll(Arrays.asList(array)); - a.add(action); - return a.toArray(new Runnable[0]); - } - - public boolean remove(Object action) { - boolean remove = false; - Runnable[] updatedFlushActions = removeAction(flushActions, action); - if (updatedFlushActions != null) { - flushActionsHandle.setVolatile(this, updatedFlushActions); - remove = true; - } - Runnable[] updatedCloseActions = removeAction(closeActions, action); - if (updatedCloseActions != null) { - closeActionsHandle.setVolatile(this, updatedCloseActions); - remove = true; - } - - boolean removeConsumer = false; - List list = new ArrayList<>(); - for (int i = 0; i < consumers.length; i++) { - if (consumers[i].action != action) { - list.add(consumers[i]); - } else { - removeConsumer = true; - remove = true; - } - } - if (removeConsumer) { - EventConsumer[] array = list.toArray(new EventConsumer[list.size()]); - consumersHandle.setVolatile(this, array); - dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset dispatch - } - return remove; - } - - public void addFlush(Runnable action) { - flushActionsHandle.setVolatile(this, addAction(flushActions, action)); - } - - public void close() { - closedHandle.setVolatile(this, true); - // TODO: Data races here, must fix - if (eventSet != null) { - eventSet.release(null); - } - if (location != null) { - location.release(); - } - - Runnable[] cas = this.closeActions; - for (int i = 0; i < cas.length; i++) { - cas[i].run(); - } - } - - public void addCloseAction(Runnable action) { - closeActionsHandle.setVolatile(this, addAction(closeActions, action)); - } - - public void setStartNanos(long startNanos) { - this.startNanos = startNanos; - } -} \ No newline at end of file diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Fri May 24 19:39:31 2019 +0200 @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantLock; import jdk.jfr.internal.consumer.ChunkHeader; +import jdk.jfr.internal.consumer.InternalEventFilter; import jdk.jfr.internal.consumer.RecordingInput; /** diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri May 24 19:39:31 2019 +0200 @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; +import java.time.Instant; import java.util.function.Consumer; /** @@ -53,6 +54,8 @@ * * @param file location of the file, not {@code null} * @return an event stream, not {@code null} + * + * @throws IOException if a stream can't be opened,or an I/O error occurs during reading */ public static EventStream openFile(Path file) throws IOException { throw new UnsupportedOperationException("Not yet implemented"); @@ -60,6 +63,29 @@ } /** + * Creates an event stream starting start time and end time in a file. + * + * @param file location of the file, not {@code null} + * + * @param the start start time for the stream, or {@code null} to get data from + * the beginning of the + * + * @param the end end time for the stream, or {@code null} to get data until the + * end. + * + * @throws IllegalArgumentException if {@code end} happens before + * {@code start} + * + * @throws IOException if a stream can't be opened,or an I/O error occurs during reading + */ + public static EventStream openFile(Path file, Instant from, Instant to) throws IOException { + throw new UnsupportedOperationException("Not yet implemented"); +// return new EventFileStream(file); + } + + + + /** * Performs an action on all events in the stream. * * @param action an action to be performed on each {@code RecordedEvent}, diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/InternalEventFilter.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/InternalEventFilter.java Tue May 21 22:59:47 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.jfr.consumer; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public final class InternalEventFilter { - static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter(); - private final Map thresholds = new HashMap<>(); - private boolean acceptAll; - - public static InternalEventFilter merge(Collection filters) { - for (InternalEventFilter ef : filters) { - if (ef.getAcceptAll()) { - return ACCEPT_ALL; - } - } - if (filters.size() == 1) { - return filters.iterator().next(); - } - - Set eventNames = new HashSet<>(); - for (InternalEventFilter ef : filters) { - eventNames.addAll(ef.thresholds.keySet()); - } - InternalEventFilter result = new InternalEventFilter(); - for (String eventName : eventNames) { - for (InternalEventFilter ef : filters) { - Long l = ef.thresholds.get(eventName); - if (l != null) { - result.setThreshold(eventName, l.longValue()); - } - } - } - return result; - } - - private boolean getAcceptAll() { - return acceptAll; - } - - public void setAcceptAll() { - acceptAll = true; - } - - public void setThreshold(String eventName, long nanos) { - Long l = thresholds.get(eventName); - if (l != null) { - l = Math.min(l, nanos); - } else { - l = nanos; - } - thresholds.put(eventName, l); - } - - public long getThreshold(String eventName) { - if (acceptAll) { - return 0; - } - Long l = thresholds.get(eventName); - if (l != null) { - return l; - } - return -1; - } - public String toString() { - if (acceptAll) { - return "ACCEPT ALL"; - } - StringBuilder sb = new StringBuilder(); - for (String key : thresholds.keySet().toArray(new String[0])) { - Long value = thresholds.get(key); - sb.append(key); - sb.append(" = "); - sb.append(value.longValue() / 1_000_000); - sb.append(" ms"); - } - return sb.toString(); - } -} \ No newline at end of file diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/consumer/LongMap.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/LongMap.java Tue May 21 22:59:47 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/LongMap.java Fri May 24 19:39:31 2019 +0200 @@ -30,7 +30,7 @@ import java.util.function.LongConsumer; @SuppressWarnings("unchecked") -final class LongMap { +public final class LongMap { private static final int MAXIMUM_CAPACITY = 1 << 30; private static final long[] EMPTY_KEYS = new long[0]; private static final Object[] EMPTY_OBJECTS = new Object[0]; diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri May 24 19:39:31 2019 +0200 @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.internal.consumer; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +import jdk.jfr.EventType; +import jdk.jfr.consumer.LongMap; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.internal.JVM; +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; + +abstract public class EventConsumer implements Runnable { + + private final static class EventDispatcher { + public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; + + final private String eventName; + final Consumer action; + + public EventDispatcher(Consumer action) { + this(null, action); + } + + public EventDispatcher(String eventName, Consumer action) { + this.eventName = eventName; + this.action = action; + } + + public void offer(RecordedEvent event) { + action.accept(event); + } + + public boolean accepts(EventType eventType) { + return (eventName == null || eventType.getName().equals(eventName)); + } + } + + private final static JVM jvm = JVM.getJVM(); + private final static VarHandle closedHandle; + private final static VarHandle consumersHandle; + private final static VarHandle dispatcherHandle; + private final static VarHandle flushActionsHandle; + private final static VarHandle closeActionsHandle; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class); + consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class); + dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class); + flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class); + closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class); + } catch (ReflectiveOperationException e) { + throw new InternalError(e); + } + } + // set by VarHandle + private boolean closed; + // set by VarHandle + private EventDispatcher[] consumers = new EventDispatcher[0]; + // set by VarHandle + private LongMap dispatcher = new LongMap<>(); + // set by VarHandle + private Runnable[] flushActions = new Runnable[0]; + // set by VarHandle + private Runnable[] closeActions = new Runnable[0]; + + protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; + + private final AccessControlContext accessControlContext; + private boolean started; + private Thread thread; + + protected long startNanos; + + public EventConsumer(AccessControlContext acc) throws IOException { + this.accessControlContext = acc; + } + + public void run() { + doPriviliged(() -> execute()); + } + + void doPriviliged(Runnable r) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + r.run(); + return null; + } + }, accessControlContext); + } + + private void execute() { + jvm.exclude(Thread.currentThread()); + try { + process(); + } catch (Throwable e) { + e.printStackTrace(); + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer."); + } finally { + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); + } + } + + public abstract void process() throws Exception; + + public synchronized boolean remove(Object action) { + boolean remove = false; + Runnable[] updatedFlushActions = removeAction(flushActions, action); + if (updatedFlushActions != null) { + flushActionsHandle.setVolatile(this, updatedFlushActions); + remove = true; + } + Runnable[] updatedCloseActions = removeAction(closeActions, action); + if (updatedCloseActions != null) { + closeActionsHandle.setVolatile(this, updatedCloseActions); + remove = true; + } + + boolean removeConsumer = false; + List list = new ArrayList<>(); + for (int i = 0; i < consumers.length; i++) { + if (consumers[i].action != action) { + list.add(consumers[i]); + } else { + removeConsumer = true; + remove = true; + } + } + if (removeConsumer) { + EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); + consumersHandle.setVolatile(this, array); + dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset + // dispatch + } + return remove; + } + + public void dispatch(RecordedEvent e) { + EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId()); + if (consumerDispatch == null) { + consumerDispatch = EventDispatcher.NO_DISPATCHERS; + for (EventDispatcher ec : consumers.clone()) { + if (ec.accepts(e.getEventType())) { + consumerDispatch = merge(consumerDispatch, ec); + } + } + dispatcher.put(e.getEventType().getId(), consumerDispatch); + } + for (int i = 0; i < consumerDispatch.length; i++) { + consumerDispatch[i].offer(e); + } + + } + + public void onEvent(Consumer action) { + add(new EventDispatcher(action)); + } + + public void onEvent(String eventName, Consumer action) { + add(new EventDispatcher(eventName, action)); + } + + private synchronized void add(EventDispatcher e) { + consumersHandle.setVolatile(this, merge(consumers, e)); + dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset + } + + public synchronized void onFlush(Runnable action) { + flushActionsHandle.setVolatile(this, addAction(flushActions, action)); + } + + public synchronized void addCloseAction(Runnable action) { + closeActionsHandle.setVolatile(this, addAction(closeActions, action)); + } + + public void setClosed(boolean closed) { + closedHandle.setVolatile(this, closed); + } + + final public boolean isClosed() { + return closed; + } + + public void runCloseActions() { + + Runnable[] cas = this.closeActions; + for (int i = 0; i < cas.length; i++) { + cas[i].run(); + } + } + + public void runFlushActions() { + Runnable[] fas = this.flushActions; + for (int i = 0; i < fas.length; i++) { + fas[i].run(); + } + } + + public synchronized void startAsync(long startNanos) { + if (started) { + throw new IllegalStateException("Event stream can only be started once"); + } + started = true; + setStartNanos(startNanos); + thread = new Thread(this); + thread.setDaemon(true); + thread.start(); + } + + public void start(long startNanos) { + synchronized (this) { + if (started) { + throw new IllegalStateException("Event stream can only be started once"); + } + started = true; + setStartNanos(startNanos); + } + run(); + } + + public void awaitTermination(Duration timeout) { + Objects.requireNonNull(timeout); + Thread t = null; + synchronized (this) { + t = thread; + } + if (t != null && t != Thread.currentThread()) { + try { + t.join(timeout.toMillis()); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void awaitTermination() { + awaitTermination(Duration.ofMillis(0)); + } + + private void setStartNanos(long startNanos) { + this.startNanos = startNanos; + } + + protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) { + EventDispatcher[] array = new EventDispatcher[current.length + 1]; + System.arraycopy(current, 0, array, 0, current.length); + array[current.length] = add; + return array; + } + + private static Runnable[] removeAction(Runnable[] array, Object action) { + if (array.length == 0) { + return null; + } + boolean remove = false; + List list = new ArrayList<>(); + for (int i = 0; i < array.length; i++) { + if (array[i] != action) { + list.add(array[i]); + } else { + remove = true; + } + } + if (remove) { + return list.toArray(new Runnable[list.size()]); + } + return null; + } + + private static Runnable[] addAction(Runnable[] array, Runnable action) { + ArrayList a = new ArrayList<>(); + a.addAll(Arrays.asList(array)); + a.add(action); + return a.toArray(new Runnable[0]); + } + +} \ No newline at end of file diff -r 29635339ef62 -r 50ca040843ea src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java Fri May 24 19:39:31 2019 +0200 @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.internal.consumer; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public final class InternalEventFilter { + public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter(); + private final Map thresholds = new HashMap<>(); + private boolean acceptAll; + + public static InternalEventFilter merge(Collection filters) { + for (InternalEventFilter ef : filters) { + if (ef.getAcceptAll()) { + return ACCEPT_ALL; + } + } + if (filters.size() == 1) { + return filters.iterator().next(); + } + + Set eventNames = new HashSet<>(); + for (InternalEventFilter ef : filters) { + eventNames.addAll(ef.thresholds.keySet()); + } + InternalEventFilter result = new InternalEventFilter(); + for (String eventName : eventNames) { + for (InternalEventFilter ef : filters) { + Long l = ef.thresholds.get(eventName); + if (l != null) { + result.setThreshold(eventName, l.longValue()); + } + } + } + return result; + } + + private boolean getAcceptAll() { + return acceptAll; + } + + public void setAcceptAll() { + acceptAll = true; + } + + public void setThreshold(String eventName, long nanos) { + Long l = thresholds.get(eventName); + if (l != null) { + l = Math.min(l, nanos); + } else { + l = nanos; + } + thresholds.put(eventName, l); + } + + public long getThreshold(String eventName) { + if (acceptAll) { + return 0; + } + Long l = thresholds.get(eventName); + if (l != null) { + return l; + } + return -1; + } + public String toString() { + if (acceptAll) { + return "ACCEPT ALL"; + } + StringBuilder sb = new StringBuilder(); + for (String key : thresholds.keySet().toArray(new String[0])) { + Long value = thresholds.get(key); + sb.append(key); + sb.append(" = "); + sb.append(value.longValue() / 1_000_000); + sb.append(" ms"); + } + return sb.toString(); + } +} \ No newline at end of file