--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 24 19:39:31 2019 +0200
@@ -38,6 +38,7 @@
import jdk.jfr.internal.Type;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.InternalEventFilter;
import jdk.jfr.internal.consumer.Parser;
import jdk.jfr.internal.consumer.RecordingInput;
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Fri May 24 19:39:31 2019 +0200
@@ -32,141 +32,144 @@
import java.util.Objects;
import java.util.function.Consumer;
-import jdk.jfr.EventType;
+import jdk.jfr.internal.consumer.EventConsumer;
final class EventDirectoryStream implements EventStream {
- public final static class EventConsumer {
- final private String eventName;
- final Consumer<RecordedEvent> action;
+ private static class EventRunner extends EventConsumer {
+ private EventSetLocation location;
+ private EventSet eventSet;
+ private int eventSetIndex;
+ private int eventArrayIndex;
+ private RecordedEvent[] currentEventArray = new RecordedEvent[0];
- EventConsumer(String eventName, Consumer<RecordedEvent> eventConsumer) {
- this.eventName = eventName;
- this.action = eventConsumer;
+ public EventRunner(AccessControlContext acc) throws IOException {
+ super(acc);
}
- public void offer(RecordedEvent event) {
- action.accept(event);
+ public void process() throws Exception, IOException {
+ this.location = EventSetLocation.current();
+ this.eventSet = location.acquire(startNanos, null); // use timestamp
+ // from
+ if (eventSet == null) {
+ return;
+ }
+ while (!isClosed()) {
+ processSegment();
+ runFlushActions();
+ do {
+ if (isClosed()) {
+ return;
+ }
+ currentEventArray = eventSet.readEvents(eventSetIndex);
+ if (currentEventArray == EventSet.END_OF_SET) {
+ eventSet = eventSet.next(eventFilter);
+ if (eventSet == null || isClosed()) {
+ return;
+ }
+ eventSetIndex = 0;
+ continue;
+ }
+ if (currentEventArray == null) {
+ return; // no more events
+ }
+ eventSetIndex++;
+ } while (currentEventArray.length == 0);
+ eventArrayIndex = 0;
+ }
}
- public boolean accepts(EventType eventType) {
- return (eventName == null || eventType.getName().equals(eventName));
+ private void processSegment() {
+ while (eventArrayIndex < currentEventArray.length) {
+ RecordedEvent e = currentEventArray[eventArrayIndex++];
+ if (e == null) {
+ return;
+ }
+ dispatch(e);
+ }
+ }
+
+ public void close() {
+ setClosed(true);
+ // TODO: Data races here, must fix
+ synchronized (this) {
+ if (eventSet != null) {
+ eventSet.release(null);
+ }
+ if (location != null) {
+ location.release();
+ }
+ }
+ runCloseActions();
}
}
- private final EventRunner eventRunner;
- private Thread thread;
- private boolean started;
+ private final EventRunner eventConsumer;
public EventDirectoryStream(AccessControlContext acc) throws IOException {
- eventRunner = new EventRunner(acc);
+ eventConsumer = new EventRunner(acc);
}
public void close() {
- synchronized (eventRunner) {
- eventRunner.close();
- }
+ eventConsumer.close();
}
- public synchronized void onFlush(Runnable action) {
+ public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- this.eventRunner.addFlush(action);
- }
+ eventConsumer.onFlush(action);
}
void start(long startNanos) {
- synchronized (eventRunner) {
- if (started) {
- throw new IllegalStateException("Event stream can only be started once");
- }
- started = true;
- eventRunner.setStartNanos(startNanos);
- }
- eventRunner.run();
+ eventConsumer.start(startNanos);
}
@Override
public void start() {
- start(Instant.now().toEpochMilli() * 1000*1000L);
+ start(Instant.now().toEpochMilli() * 1000 * 1000L);
}
@Override
public void startAsync() {
- startAsync(Instant.now().toEpochMilli() * 1000*1000L);
+ startAsync(Instant.now().toEpochMilli() * 1000 * 1000L);
}
void startAsync(long startNanos) {
- synchronized (eventRunner) {
- eventRunner.setStartNanos(startNanos);
- thread = new Thread(eventRunner);
- thread.setDaemon(true);
- thread.start();
- }
+ eventConsumer.startAsync(startNanos);
}
- public void addEventConsumer(EventConsumer action) {
- Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(action);
- }
- }
-
-
-
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(new EventConsumer(null, action));
- }
+ eventConsumer.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.add(new EventConsumer(eventName, action));
- }
+ eventConsumer.onEvent(eventName, action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- eventRunner.addCloseAction(action);
- }
+ eventConsumer.addCloseAction(action);
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- synchronized (eventRunner) {
- return eventRunner.remove(action);
- }
+ return eventConsumer.remove(action);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
- Thread t = null;
- synchronized (eventRunner) {
- t = thread;
- }
- if (t != null && t != Thread.currentThread()) {
- try {
- t.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
- }
- }
+ eventConsumer.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- awaitTermination(Duration.ofMillis(0));
+ eventConsumer.awaitTermination(Duration.ofMillis(0));
}
-
-
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Fri May 24 19:39:31 2019 +0200
@@ -25,85 +25,111 @@
package jdk.jfr.consumer;
+import java.io.IOException;
import java.nio.file.Path;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
+import jdk.jfr.internal.consumer.EventConsumer;
+import jdk.jfr.internal.consumer.RecordingInput;
+
/**
* Implementation of an event stream that operates against a recording file.
*
*/
final class EventFileStream implements EventStream {
- public EventFileStream(Path path) {
+ final static class FileEventConsumer extends EventConsumer {
+ private final RecordingInput input;
+
+ public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException {
+ super(acc);
+ this.input = input;
+ }
+
+ @Override
+ public void process() throws Exception {
+ // TODO This need more work; filter, multiple chunk etc
+ ChunkParser cp = new ChunkParser(input);
+ while (true) {
+ RecordedEvent e = cp.readEvent();
+ dispatch(e);
+ }
+ }
+ }
+
+ private final RecordingInput input;
+ private final FileEventConsumer eventConsumer;
+
+ public EventFileStream(Path path) throws IOException {
Objects.requireNonNull(path);
+ input = new RecordingInput(path.toFile());
+ eventConsumer = new FileEventConsumer(AccessController.getContext(), input);
}
@Override
public void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
- notImplemented();
- }
-
- public void onEvent(EventFilter filter, Consumer<RecordedEvent> action) {
- Objects.requireNonNull(filter);
- Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onEvent(eventName, action);
}
@Override
public void onFlush(Runnable action) {
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.onFlush(action);
}
@Override
public void onClose(Runnable action) {
Objects.requireNonNull(action);
- notImplemented();
+ eventConsumer.addCloseAction(action);
}
@Override
public void close() {
- notImplemented();
+ eventConsumer.setClosed(true);
+ eventConsumer.runCloseActions();
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
@Override
public boolean remove(Object action) {
Objects.requireNonNull(action);
- notImplemented();
- return false;
+ return eventConsumer.remove(action);
}
@Override
public void start() {
- notImplemented();
+ eventConsumer.start(0);
}
@Override
public void startAsync() {
- notImplemented();
+ eventConsumer.startAsync(0);
}
@Override
public void awaitTermination(Duration timeout) {
Objects.requireNonNull(timeout);
+ eventConsumer.awaitTermination(timeout);
}
@Override
public void awaitTermination() {
- notImplemented();
- }
-
- private static void notImplemented() {
- throw new UnsupportedOperationException("Streaming for files not yet implemenetd");
+ eventConsumer.awaitTermination();
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java Fri May 24 19:39:31 2019 +0200
@@ -26,7 +26,6 @@
package jdk.jfr.consumer;
import java.time.Duration;
-import java.time.Instant;
import java.util.Arrays;
import java.util.List;
@@ -45,31 +44,25 @@
return new EventFilter(eventNames.clone(), null, new String[0]);
}
- public EventFilter threshold(Duration threshold) {
+ public EventFilter aboveThreshold(Duration threshold) {
return new EventFilter(eventNames, threshold, fields);
}
- public EventFilter fields(String... fieldNames) {
+ public EventFilter mustHaveFields(String... fieldNames) {
return new EventFilter(eventNames, threshold, fieldNames);
}
- public EventFilter start(Instant instant) {
+
+
+ public EventFilter onlyThreads(Thread... t) {
return this;
}
- public EventFilter end(Instant instant) {
+ public EventFilter onlyThreadIds(long... threadId) {
return this;
}
- public EventFilter threads(Thread... t) {
- return this;
- }
-
- public EventFilter threadIds(long... threadId) {
- return this;
- }
-
- public EventFilter threadNames(String... threadName) {
+ public EventFilter onlyThreadNames(String... threadName) {
return this;
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventRunner.java Tue May 21 22:59:47 2019 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,266 +0,0 @@
-/*
- * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.jfr.consumer;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import jdk.jfr.consumer.EventDirectoryStream.EventConsumer;
-import jdk.jfr.internal.JVM;
-import jdk.jfr.internal.LogLevel;
-import jdk.jfr.internal.LogTag;
-import jdk.jfr.internal.Logger;
-
-class EventRunner implements Runnable {
- private final static VarHandle closedHandle;
- private final static VarHandle consumersHandle;
- private final static VarHandle dispatcherHandle;
- private final static VarHandle flushActionsHandle;
- private final static VarHandle closeActionsHandle;
- static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- closedHandle = l.findVarHandle(EventRunner.class, "closed", boolean.class);
- consumersHandle = l.findVarHandle(EventRunner.class, "consumers", EventConsumer[].class);
- dispatcherHandle = l.findVarHandle(EventRunner.class, "dispatcher", LongMap.class);
- flushActionsHandle = l.findVarHandle(EventRunner.class, "flushActions", Runnable[].class);
- closeActionsHandle = l.findVarHandle(EventRunner.class, "closeActions", Runnable[].class);
- } catch (ReflectiveOperationException e) {
- throw new InternalError(e);
- }
- }
- // set by VarHandle
- private boolean closed;
- // set by VarHandle
- private EventConsumer[] consumers = new EventConsumer[0];
- // set by VarHandle
- private LongMap<EventConsumer[]> dispatcher = new LongMap<>();
- // set by VarHandle
- private Runnable[] flushActions = new Runnable[0];
- // set by VarHandle
- private Runnable[] closeActions = new Runnable[0];
-
- private final static JVM jvm = JVM.getJVM();
- private final static EventConsumer[] NO_CONSUMERS = new EventConsumer[0];
- private final AccessControlContext accessControlContext;
- private EventSetLocation location;
- private EventSet eventSet;
- private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
- private int eventSetIndex;
- private int eventArrayIndex;
- private RecordedEvent[] currentEventArray = new RecordedEvent[0];
- private volatile long startNanos;
-
- public EventRunner(AccessControlContext acc) throws IOException {
- this.accessControlContext = acc;
- }
-
- public void run() {
- doPriviliged(() -> execute());
- }
-
- void doPriviliged(Runnable r) {
- AccessController.doPrivileged(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
- r.run();
- return null;
- }
- }, accessControlContext);
- }
-
- private void execute() {
- jvm.exclude(Thread.currentThread());
- try {
- process();
- } catch (Throwable e) {
- e.printStackTrace();
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
- } finally {
- Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
- }
- }
-
- private void process() throws Exception, IOException {
- this.location = EventSetLocation.current();
- this.eventSet = location.acquire(startNanos, null); // use timestamp from
- if (eventSet == null) {
- return;
- }
- while (!closed) {
- processSegment();
- Runnable[] fas = this.flushActions;
- for (int i = 0; i < fas.length; i++) {
- fas[i].run();
- }
- do {
- if (closed) {
- return;
- }
- currentEventArray = eventSet.readEvents(eventSetIndex);
- if (currentEventArray == EventSet.END_OF_SET) {
- eventSet = eventSet.next(eventFilter);
- if (eventSet == null || closed) {
- return;
- }
- eventSetIndex = 0;
- continue;
- }
- if (currentEventArray == null) {
- return; // no more events
- }
- eventSetIndex++;
- } while (currentEventArray.length == 0);
- eventArrayIndex = 0;
- }
- }
-
- private void processSegment() {
- while (eventArrayIndex < currentEventArray.length) {
- RecordedEvent e = currentEventArray[eventArrayIndex++];
- if (e == null) {
- return;
- }
- EventConsumer[] consumerDispatch = dispatcher.get(e.getEventType().getId());
- if (consumerDispatch == null) {
- consumerDispatch = NO_CONSUMERS;
- for (EventConsumer ec : consumers.clone()) {
- if (ec.accepts(e.getEventType())) {
- consumerDispatch = merge(consumerDispatch, ec);
- }
- }
- dispatcher.put(e.getEventType().getId(), consumerDispatch);
- }
- for (int i = 0; i < consumerDispatch.length; i++) {
- consumerDispatch[i].offer(e);
- }
- }
- }
-
- static EventConsumer[] merge(EventConsumer[] current, EventConsumer add) {
- EventConsumer[] array = new EventConsumer[current.length + 1];
- System.arraycopy(current, 0, array, 0, current.length);
- array[current.length] = add;
- return array;
- }
-
- public void add(EventConsumer e) {
- consumersHandle.setVolatile(this, merge(consumers, e));
- dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
- }
-
- private static Runnable[] removeAction(Runnable[] array, Object action) {
- if (array.length == 0) {
- return null;
- }
- boolean remove = false;
- List<Runnable> list = new ArrayList<>();
- for (int i = 0; i < array.length; i++) {
- if (array[i] != action) {
- list.add(array[i]);
- } else {
- remove = true;
- }
- }
- if (remove) {
- return list.toArray(new Runnable[list.size()]);
- }
- return null;
- }
-
- private static Runnable[] addAction(Runnable[] array, Runnable action) {
- ArrayList<Runnable> a = new ArrayList<>();
- a.addAll(Arrays.asList(array));
- a.add(action);
- return a.toArray(new Runnable[0]);
- }
-
- public boolean remove(Object action) {
- boolean remove = false;
- Runnable[] updatedFlushActions = removeAction(flushActions, action);
- if (updatedFlushActions != null) {
- flushActionsHandle.setVolatile(this, updatedFlushActions);
- remove = true;
- }
- Runnable[] updatedCloseActions = removeAction(closeActions, action);
- if (updatedCloseActions != null) {
- closeActionsHandle.setVolatile(this, updatedCloseActions);
- remove = true;
- }
-
- boolean removeConsumer = false;
- List<EventConsumer> list = new ArrayList<>();
- for (int i = 0; i < consumers.length; i++) {
- if (consumers[i].action != action) {
- list.add(consumers[i]);
- } else {
- removeConsumer = true;
- remove = true;
- }
- }
- if (removeConsumer) {
- EventConsumer[] array = list.toArray(new EventConsumer[list.size()]);
- consumersHandle.setVolatile(this, array);
- dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset dispatch
- }
- return remove;
- }
-
- public void addFlush(Runnable action) {
- flushActionsHandle.setVolatile(this, addAction(flushActions, action));
- }
-
- public void close() {
- closedHandle.setVolatile(this, true);
- // TODO: Data races here, must fix
- if (eventSet != null) {
- eventSet.release(null);
- }
- if (location != null) {
- location.release();
- }
-
- Runnable[] cas = this.closeActions;
- for (int i = 0; i < cas.length; i++) {
- cas[i].run();
- }
- }
-
- public void addCloseAction(Runnable action) {
- closeActionsHandle.setVolatile(this, addAction(closeActions, action));
- }
-
- public void setStartNanos(long startNanos) {
- this.startNanos = startNanos;
- }
-}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventSet.java Fri May 24 19:39:31 2019 +0200
@@ -36,6 +36,7 @@
import java.util.concurrent.locks.ReentrantLock;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.InternalEventFilter;
import jdk.jfr.internal.consumer.RecordingInput;
/**
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Fri May 24 19:39:31 2019 +0200
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
+import java.time.Instant;
import java.util.function.Consumer;
/**
@@ -53,6 +54,8 @@
*
* @param file location of the file, not {@code null}
* @return an event stream, not {@code null}
+ *
+ * @throws IOException if a stream can't be opened,or an I/O error occurs during reading
*/
public static EventStream openFile(Path file) throws IOException {
throw new UnsupportedOperationException("Not yet implemented");
@@ -60,6 +63,29 @@
}
/**
+ * Creates an event stream starting start time and end time in a file.
+ *
+ * @param file location of the file, not {@code null}
+ *
+ * @param the start start time for the stream, or {@code null} to get data from
+ * the beginning of the
+ *
+ * @param the end end time for the stream, or {@code null} to get data until the
+ * end.
+ *
+ * @throws IllegalArgumentException if {@code end} happens before
+ * {@code start}
+ *
+ * @throws IOException if a stream can't be opened,or an I/O error occurs during reading
+ */
+ public static EventStream openFile(Path file, Instant from, Instant to) throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented");
+// return new EventFileStream(file);
+ }
+
+
+
+ /**
* Performs an action on all events in the stream.
*
* @param action an action to be performed on each {@code RecordedEvent},
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/InternalEventFilter.java Tue May 21 22:59:47 2019 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.jfr.consumer;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public final class InternalEventFilter {
- static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter();
- private final Map<String, Long> thresholds = new HashMap<>();
- private boolean acceptAll;
-
- public static InternalEventFilter merge(Collection<InternalEventFilter> filters) {
- for (InternalEventFilter ef : filters) {
- if (ef.getAcceptAll()) {
- return ACCEPT_ALL;
- }
- }
- if (filters.size() == 1) {
- return filters.iterator().next();
- }
-
- Set<String> eventNames = new HashSet<>();
- for (InternalEventFilter ef : filters) {
- eventNames.addAll(ef.thresholds.keySet());
- }
- InternalEventFilter result = new InternalEventFilter();
- for (String eventName : eventNames) {
- for (InternalEventFilter ef : filters) {
- Long l = ef.thresholds.get(eventName);
- if (l != null) {
- result.setThreshold(eventName, l.longValue());
- }
- }
- }
- return result;
- }
-
- private boolean getAcceptAll() {
- return acceptAll;
- }
-
- public void setAcceptAll() {
- acceptAll = true;
- }
-
- public void setThreshold(String eventName, long nanos) {
- Long l = thresholds.get(eventName);
- if (l != null) {
- l = Math.min(l, nanos);
- } else {
- l = nanos;
- }
- thresholds.put(eventName, l);
- }
-
- public long getThreshold(String eventName) {
- if (acceptAll) {
- return 0;
- }
- Long l = thresholds.get(eventName);
- if (l != null) {
- return l;
- }
- return -1;
- }
- public String toString() {
- if (acceptAll) {
- return "ACCEPT ALL";
- }
- StringBuilder sb = new StringBuilder();
- for (String key : thresholds.keySet().toArray(new String[0])) {
- Long value = thresholds.get(key);
- sb.append(key);
- sb.append(" = ");
- sb.append(value.longValue() / 1_000_000);
- sb.append(" ms");
- }
- return sb.toString();
- }
-}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/LongMap.java Tue May 21 22:59:47 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/LongMap.java Fri May 24 19:39:31 2019 +0200
@@ -30,7 +30,7 @@
import java.util.function.LongConsumer;
@SuppressWarnings("unchecked")
-final class LongMap<T> {
+public final class LongMap<T> {
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final long[] EMPTY_KEYS = new long[0];
private static final Object[] EMPTY_OBJECTS = new Object[0];
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventConsumer.java Fri May 24 19:39:31 2019 +0200
@@ -0,0 +1,315 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.internal.consumer;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.consumer.LongMap;
+import jdk.jfr.consumer.RecordedEvent;
+import jdk.jfr.internal.JVM;
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+
+abstract public class EventConsumer implements Runnable {
+
+ private final static class EventDispatcher {
+ public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
+
+ final private String eventName;
+ final Consumer<RecordedEvent> action;
+
+ public EventDispatcher(Consumer<RecordedEvent> action) {
+ this(null, action);
+ }
+
+ public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
+ this.eventName = eventName;
+ this.action = action;
+ }
+
+ public void offer(RecordedEvent event) {
+ action.accept(event);
+ }
+
+ public boolean accepts(EventType eventType) {
+ return (eventName == null || eventType.getName().equals(eventName));
+ }
+ }
+
+ private final static JVM jvm = JVM.getJVM();
+ private final static VarHandle closedHandle;
+ private final static VarHandle consumersHandle;
+ private final static VarHandle dispatcherHandle;
+ private final static VarHandle flushActionsHandle;
+ private final static VarHandle closeActionsHandle;
+ static {
+ try {
+ MethodHandles.Lookup l = MethodHandles.lookup();
+ closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class);
+ consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class);
+ dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class);
+ flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class);
+ closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class);
+ } catch (ReflectiveOperationException e) {
+ throw new InternalError(e);
+ }
+ }
+ // set by VarHandle
+ private boolean closed;
+ // set by VarHandle
+ private EventDispatcher[] consumers = new EventDispatcher[0];
+ // set by VarHandle
+ private LongMap<EventDispatcher[]> dispatcher = new LongMap<>();
+ // set by VarHandle
+ private Runnable[] flushActions = new Runnable[0];
+ // set by VarHandle
+ private Runnable[] closeActions = new Runnable[0];
+
+ protected InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+
+ private final AccessControlContext accessControlContext;
+ private boolean started;
+ private Thread thread;
+
+ protected long startNanos;
+
+ public EventConsumer(AccessControlContext acc) throws IOException {
+ this.accessControlContext = acc;
+ }
+
+ public void run() {
+ doPriviliged(() -> execute());
+ }
+
+ void doPriviliged(Runnable r) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ r.run();
+ return null;
+ }
+ }, accessControlContext);
+ }
+
+ private void execute() {
+ jvm.exclude(Thread.currentThread());
+ try {
+ process();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpectedexception iterating consumer.");
+ } finally {
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+ }
+ }
+
+ public abstract void process() throws Exception;
+
+ public synchronized boolean remove(Object action) {
+ boolean remove = false;
+ Runnable[] updatedFlushActions = removeAction(flushActions, action);
+ if (updatedFlushActions != null) {
+ flushActionsHandle.setVolatile(this, updatedFlushActions);
+ remove = true;
+ }
+ Runnable[] updatedCloseActions = removeAction(closeActions, action);
+ if (updatedCloseActions != null) {
+ closeActionsHandle.setVolatile(this, updatedCloseActions);
+ remove = true;
+ }
+
+ boolean removeConsumer = false;
+ List<EventDispatcher> list = new ArrayList<>();
+ for (int i = 0; i < consumers.length; i++) {
+ if (consumers[i].action != action) {
+ list.add(consumers[i]);
+ } else {
+ removeConsumer = true;
+ remove = true;
+ }
+ }
+ if (removeConsumer) {
+ EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]);
+ consumersHandle.setVolatile(this, array);
+ dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
+ // dispatch
+ }
+ return remove;
+ }
+
+ public void dispatch(RecordedEvent e) {
+ EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId());
+ if (consumerDispatch == null) {
+ consumerDispatch = EventDispatcher.NO_DISPATCHERS;
+ for (EventDispatcher ec : consumers.clone()) {
+ if (ec.accepts(e.getEventType())) {
+ consumerDispatch = merge(consumerDispatch, ec);
+ }
+ }
+ dispatcher.put(e.getEventType().getId(), consumerDispatch);
+ }
+ for (int i = 0; i < consumerDispatch.length; i++) {
+ consumerDispatch[i].offer(e);
+ }
+
+ }
+
+ public void onEvent(Consumer<RecordedEvent> action) {
+ add(new EventDispatcher(action));
+ }
+
+ public void onEvent(String eventName, Consumer<RecordedEvent> action) {
+ add(new EventDispatcher(eventName, action));
+ }
+
+ private synchronized void add(EventDispatcher e) {
+ consumersHandle.setVolatile(this, merge(consumers, e));
+ dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset
+ }
+
+ public synchronized void onFlush(Runnable action) {
+ flushActionsHandle.setVolatile(this, addAction(flushActions, action));
+ }
+
+ public synchronized void addCloseAction(Runnable action) {
+ closeActionsHandle.setVolatile(this, addAction(closeActions, action));
+ }
+
+ public void setClosed(boolean closed) {
+ closedHandle.setVolatile(this, closed);
+ }
+
+ final public boolean isClosed() {
+ return closed;
+ }
+
+ public void runCloseActions() {
+
+ Runnable[] cas = this.closeActions;
+ for (int i = 0; i < cas.length; i++) {
+ cas[i].run();
+ }
+ }
+
+ public void runFlushActions() {
+ Runnable[] fas = this.flushActions;
+ for (int i = 0; i < fas.length; i++) {
+ fas[i].run();
+ }
+ }
+
+ public synchronized void startAsync(long startNanos) {
+ if (started) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ started = true;
+ setStartNanos(startNanos);
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void start(long startNanos) {
+ synchronized (this) {
+ if (started) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ started = true;
+ setStartNanos(startNanos);
+ }
+ run();
+ }
+
+ public void awaitTermination(Duration timeout) {
+ Objects.requireNonNull(timeout);
+ Thread t = null;
+ synchronized (this) {
+ t = thread;
+ }
+ if (t != null && t != Thread.currentThread()) {
+ try {
+ t.join(timeout.toMillis());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public void awaitTermination() {
+ awaitTermination(Duration.ofMillis(0));
+ }
+
+ private void setStartNanos(long startNanos) {
+ this.startNanos = startNanos;
+ }
+
+ protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) {
+ EventDispatcher[] array = new EventDispatcher[current.length + 1];
+ System.arraycopy(current, 0, array, 0, current.length);
+ array[current.length] = add;
+ return array;
+ }
+
+ private static Runnable[] removeAction(Runnable[] array, Object action) {
+ if (array.length == 0) {
+ return null;
+ }
+ boolean remove = false;
+ List<Runnable> list = new ArrayList<>();
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != action) {
+ list.add(array[i]);
+ } else {
+ remove = true;
+ }
+ }
+ if (remove) {
+ return list.toArray(new Runnable[list.size()]);
+ }
+ return null;
+ }
+
+ private static Runnable[] addAction(Runnable[] array, Runnable action) {
+ ArrayList<Runnable> a = new ArrayList<>();
+ a.addAll(Arrays.asList(array));
+ a.add(action);
+ return a.toArray(new Runnable[0]);
+ }
+
+}
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/InternalEventFilter.java Fri May 24 19:39:31 2019 +0200
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.internal.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public final class InternalEventFilter {
+ public static final InternalEventFilter ACCEPT_ALL = new InternalEventFilter();
+ private final Map<String, Long> thresholds = new HashMap<>();
+ private boolean acceptAll;
+
+ public static InternalEventFilter merge(Collection<InternalEventFilter> filters) {
+ for (InternalEventFilter ef : filters) {
+ if (ef.getAcceptAll()) {
+ return ACCEPT_ALL;
+ }
+ }
+ if (filters.size() == 1) {
+ return filters.iterator().next();
+ }
+
+ Set<String> eventNames = new HashSet<>();
+ for (InternalEventFilter ef : filters) {
+ eventNames.addAll(ef.thresholds.keySet());
+ }
+ InternalEventFilter result = new InternalEventFilter();
+ for (String eventName : eventNames) {
+ for (InternalEventFilter ef : filters) {
+ Long l = ef.thresholds.get(eventName);
+ if (l != null) {
+ result.setThreshold(eventName, l.longValue());
+ }
+ }
+ }
+ return result;
+ }
+
+ private boolean getAcceptAll() {
+ return acceptAll;
+ }
+
+ public void setAcceptAll() {
+ acceptAll = true;
+ }
+
+ public void setThreshold(String eventName, long nanos) {
+ Long l = thresholds.get(eventName);
+ if (l != null) {
+ l = Math.min(l, nanos);
+ } else {
+ l = nanos;
+ }
+ thresholds.put(eventName, l);
+ }
+
+ public long getThreshold(String eventName) {
+ if (acceptAll) {
+ return 0;
+ }
+ Long l = thresholds.get(eventName);
+ if (l != null) {
+ return l;
+ }
+ return -1;
+ }
+ public String toString() {
+ if (acceptAll) {
+ return "ACCEPT ALL";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String key : thresholds.keySet().toArray(new String[0])) {
+ Long value = thresholds.get(key);
+ sb.append(key);
+ sb.append(" = ");
+ sb.append(value.longValue() / 1_000_000);
+ sb.append(" ms");
+ }
+ return sb.toString();
+ }
+}
\ No newline at end of file