--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -33,10 +33,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import jdk.jfr.EventType;
@@ -44,10 +44,7 @@
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.LongMap;
import jdk.jfr.internal.SecuritySupport;
-import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
/*
* Purpose of this class is to simplify the implementation of
@@ -62,254 +59,10 @@
*/
abstract class AbstractEventStream implements EventStream {
- protected static final class StreamConfiguration {
- private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
- private Consumer<?>[] errorActions = new Consumer<?>[0];
- private Runnable[] flushActions = NO_ACTIONS;
- private Runnable[] closeActions = NO_ACTIONS;
- private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
- private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
- private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
- private boolean changedConfiguration = false;
- private boolean closed = false;
- private boolean reuse = true;
- private boolean ordered = true;
- private Instant startTime = null;
- private Instant endTime = null;
- private boolean started = false;
- private long startNanos = 0;
- private long endNanos = Long.MAX_VALUE;
-
- public StreamConfiguration(StreamConfiguration configuration) {
- this.flushActions = configuration.flushActions;
- this.closeActions = configuration.closeActions;
- this.errorActions = configuration.errorActions;
- this.dispatchers = configuration.dispatchers;
- this.eventFilter = configuration.eventFilter;
- this.closed = configuration.closed;
- this.reuse = configuration.reuse;
- this.ordered = configuration.ordered;
- this.startTime = configuration.startTime;
- this.endTime = configuration.endTime;
- this.started = configuration.started;
- this.startNanos = configuration.startNanos;
- this.endNanos = configuration.endNanos;
- this.dispatcherLookup = configuration.dispatcherLookup;
- }
-
- public StreamConfiguration() {
- }
-
- public StreamConfiguration remove(Object action) {
- flushActions = remove(flushActions, action);
- closeActions = remove(closeActions, action);
- dispatchers = removeDispatch(dispatchers, action);
- return this;
- }
-
- public StreamConfiguration addDispatcher(EventDispatcher e) {
- dispatchers = add(dispatchers, e);
- eventFilter = buildFilter(dispatchers);
- dispatcherLookup = new LongMap<>();
- return this;
- }
-
- public StreamConfiguration addFlushAction(Runnable action) {
- flushActions = add(flushActions, action);
- return this;
- }
-
- public StreamConfiguration addCloseAction(Runnable action) {
- closeActions = add(closeActions, action);
- return this;
- }
-
- public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
- errorActions = add(errorActions, action);
- return this;
- }
-
- public StreamConfiguration setClosed(boolean closed) {
- this.closed = closed;
- changedConfiguration = true;
- return this;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public Runnable[] getCloseActions() {
- return closeActions;
- }
-
- public Runnable[] getFlushActions() {
- return flushActions;
- }
-
- private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
- List<EventDispatcher> list = new ArrayList<>(array.length);
- boolean modified = false;
- for (int i = 0; i < array.length; i++) {
- if (array[i].action != action) {
- list.add(array[i]);
- } else {
- modified = true;
- }
- }
- EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
- if (modified) {
- eventFilter = buildFilter(result);
- dispatcherLookup = new LongMap<>();
- changedConfiguration = true;
- }
- return result;
- }
-
- private <T> T[] remove(T[] array, Object action) {
- List<T> list = new ArrayList<>(array.length);
- for (int i = 0; i < array.length; i++) {
- if (array[i] != action) {
- list.add(array[i]);
- } else {
- changedConfiguration = true;
- }
- }
- return list.toArray(array);
- }
-
- private <T> T[] add(T[] array, T object) {
- List<T> list = new ArrayList<>(Arrays.asList(array));
- list.add(object);
- changedConfiguration = true;
- return list.toArray(array);
- }
-
- private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
- InternalEventFilter ef = new InternalEventFilter();
- for (EventDispatcher ed : dispatchers) {
- String name = ed.eventName;
- if (name == null) {
- return InternalEventFilter.ACCEPT_ALL;
- }
- ef.setThreshold(name, 0);
- }
- return ef;
- }
-
- public StreamConfiguration setReuse(boolean reuse) {
- this.reuse = reuse;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setOrdered(boolean ordered) {
- this.ordered = ordered;
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setEndTime(Instant endTime) {
- this.endTime = endTime;
- this.endNanos = Utils.timeToNanos(endTime);
- changedConfiguration = true;
- return this;
- }
-
- public StreamConfiguration setStartTime(Instant startTime) {
- this.startTime = startTime;
- this.startNanos = Utils.timeToNanos(startTime);
- changedConfiguration = true;
- return this;
- }
-
- public Instant getStartTime() {
- return startTime;
- }
-
- public Object getEndTime() {
- return endTime;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public StreamConfiguration setStartNanos(long startNanos) {
- this.startNanos = startNanos;
- changedConfiguration = true;
- return this;
- }
-
- public void setStarted(boolean started) {
- this.started = started;
- changedConfiguration = true;
- }
-
- public boolean hasChanged() {
- return changedConfiguration;
- }
-
- public boolean getReuse() {
- return reuse;
- }
-
- public boolean getOrdered() {
- return ordered;
- }
-
- public InternalEventFilter getFiler() {
- return eventFilter;
- }
-
- public long getStartNanos() {
- return startNanos;
- }
-
- public long getEndNanos() {
- return endNanos;
- }
-
- public InternalEventFilter getFilter() {
- return eventFilter;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Runnable flush : flushActions) {
- sb.append("Flush Action: ").append(flush).append("\n");
- }
- for (Runnable close : closeActions) {
- sb.append("Close Action: " + close + "\n");
- }
- for (Consumer<?> error : errorActions) {
- sb.append("Error Action: " + error + "\n");
- }
- for (EventDispatcher dispatcher : dispatchers) {
- sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
- }
- sb.append("Closed: ").append(closed).append("\n");
- sb.append("Reuse: ").append(reuse).append("\n");
- sb.append("Ordered: ").append(ordered).append("\n");
- sb.append("Started: ").append(started).append("\n");
- sb.append("Start Time: ").append(startTime).append("\n");
- sb.append("Start Nanos: ").append(startNanos).append("\n");
- sb.append("End Time: ").append(endTime).append("\n");
- sb.append("End Nanos: ").append(endNanos).append("\n");
- return sb.toString();
- }
-
- private EventDispatcher[] getDispatchers() {
- return dispatchers;
- }
- }
-
- private final static class EventDispatcher {
+ final static class EventDispatcher {
final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
- final private String eventName;
- final private Consumer<RecordedEvent> action;
+ final String eventName;
+ final Consumer<RecordedEvent> action;
public EventDispatcher(Consumer<RecordedEvent> action) {
this(null, action);
@@ -330,8 +83,9 @@
}
final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
-
- private final Thread thread;
+ private final static AtomicLong counter = new AtomicLong(1);
+ private volatile Thread thread;
+ private final Object terminated = new Object();
private final boolean active;
private final Runnable flushOperation = () -> runFlushActions();
private final AccessControlContext accessControllerContext;
@@ -340,14 +94,9 @@
// Modified by updateConfiguration()
protected volatile StreamConfiguration configuration = new StreamConfiguration();
- // Cache the last event type and dispatch.
- private EventType lastEventType;
- private EventDispatcher[] lastEventDispatch;
-
public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
this.active = active;
- this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
}
@Override
@@ -360,7 +109,7 @@
abstract public void close();
// Purpose of synchronizing the following methods is
- // to serialize changes to the configuration, so only one
+ // to serialize changes to the configuration so only one
// thread at a time can change the configuration.
//
// The purpose is not to guard the configuration field. A new
@@ -455,47 +204,67 @@
}
@Override
- public final void awaitTermination() {
+ public final void awaitTermination() throws InterruptedException {
awaitTermination(Duration.ofMillis(0));
}
@Override
- public final void awaitTermination(Duration timeout) {
+ public final void awaitTermination(Duration timeout) throws InterruptedException {
Objects.requireNonNull(timeout);
- if (thread != Thread.currentThread()) {
- try {
- thread.join(timeout.toMillis());
- } catch (InterruptedException e) {
- // ignore
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("timeout value is negative");
+ }
+
+ long base = System.currentTimeMillis();
+ long now = 0;
+
+ long millis;
+ try {
+ millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+ } catch (ArithmeticException a) {
+ millis = Long.MAX_VALUE;
+ }
+ int nanos = timeout.toNanosPart();
+ if (nanos == 0 && millis == 0) {
+ synchronized (terminated) {
+ while (!isClosed()) {
+ terminated.wait(0);
+ }
+ }
+ } else {
+ while (!isClosed()) {
+ long delay = millis - now;
+ if (delay <= 0) {
+ break;
+ }
+ synchronized (terminated) {
+ terminated.wait(delay, nanos);
+ }
+ now = System.currentTimeMillis() - base;
}
}
}
protected abstract void process() throws Exception;
- protected final void clearLastDispatch() {
- lastEventDispatch = null;
- lastEventType = null;
- }
-
- protected final void dispatch(RecordedEvent event) {
+ protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
EventType type = event.getEventType();
EventDispatcher[] dispatchers = null;
- if (type == lastEventType) {
- dispatchers = lastEventDispatch;
+ if (type == c.cacheEventType) {
+ dispatchers = c.cacheDispatchers;
} else {
- dispatchers = configuration.dispatcherLookup.get(type.getId());
+ dispatchers = c.dispatcherLookup.get(type.getId());
if (dispatchers == null) {
List<EventDispatcher> list = new ArrayList<>();
- for (EventDispatcher e : configuration.getDispatchers()) {
+ for (EventDispatcher e : c.getDispatchers()) {
if (e.accepts(type)) {
list.add(e);
}
}
dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
- configuration.dispatcherLookup.put(type.getId(), dispatchers);
+ c.dispatcherLookup.put(type.getId(), dispatchers);
}
- lastEventDispatch = dispatchers;
+ c.cacheDispatchers = dispatchers;
}
for (int i = 0; i < dispatchers.length; i++) {
try {
@@ -529,11 +298,14 @@
protected final void startAsync(long startNanos) {
startInternal(startNanos);
+ Runnable r = () -> run(accessControllerContext);
+ thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
thread.start();
}
protected final void start(long startNanos) {
startInternal(startNanos);
+ thread = Thread.currentThread();
run(accessControllerContext);
}
@@ -580,6 +352,13 @@
defaultErrorHandler(e);
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+ try {
+ close();
+ } finally {
+ synchronized (terminated) {
+ terminated.notifyAll();
+ }
+ }
}
}
@@ -591,15 +370,11 @@
}
for (int i = 0; i < consumers.length; i++) {
@SuppressWarnings("unchecked")
- Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
- c.accept(e);
+ Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+ conusmer.accept(e);
}
}
- private void defaultErrorHandler(Throwable e) {
- e.printStackTrace();
- }
-
private void runFlushActions() {
Runnable[] flushActions = configuration.getFlushActions();
for (int i = 0; i < flushActions.length; i++) {
@@ -611,13 +386,22 @@
}
}
- private void run(AccessControlContext acc) {
+ private void run(AccessControlContext accessControlContext) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
execute();
return null;
}
- }, acc);
+ }, accessControlContext);
+ }
+
+ private String nextThreadName() {
+ counter.incrementAndGet();
+ return "JFR Event Stream " + counter;
+ }
+
+ private void defaultErrorHandler(Throwable e) {
+ e.printStackTrace();
}
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Thu Sep 05 16:46:50 2019 +0200
@@ -54,7 +54,7 @@
static final byte CHECKPOINT_CHUNK_HEADER_MASK = 2;
// Checkpoint contains only statics that will not change from chunk to chunk
static final byte CHECKPOINT_STATICS_MASK = 4;
- // Checkpoint contains thread realted information
+ // Checkpoint contains thread related information
static final byte CHECKPOINT_THREADS_MASK = 8;
private static final long CONSTANT_POOL_TYPE_ID = 1;
@@ -209,7 +209,7 @@
private void parseCheckpoint() throws IOException {
// Content has been parsed previously. This
- // is for triggering flsuh
+ // is to trigger flush
input.readLong(); // timestamp
input.readLong(); // duration
input.readLong(); // delta
@@ -449,6 +449,4 @@
public long getStartNanos() {
return chunkHeader.getStartNanos();
}
-
-
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java Thu Sep 05 16:46:50 2019 +0200
@@ -3,7 +3,6 @@
import jdk.jfr.internal.Type;
final class ConstantLookup {
-
private final Type type;
private ConstantMap current;
private ConstantMap previous = ConstantMap.EMPTY;
@@ -24,7 +23,6 @@
public void newPool() {
previous = current;
current = new ConstantMap(current.factory, current.name);
- // previous = new ConstantMap(); // disable cache
}
public Object getPreviousResolved(long key) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -50,7 +50,7 @@
private long chunkStartNanos;
private RecordedEvent[] sortedList;
- public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+ EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
super(acc, active);
this.fileAccess = Objects.requireNonNull(fileAccess);
this.active = active;
@@ -75,7 +75,7 @@
}
@Override
- public void process() throws Exception {
+ protected void process() throws Exception {
StreamConfiguration c = configuration;
Path path;
boolean validStartTime = active || c.getStartTime() != null;
@@ -106,11 +106,11 @@
chunkParser.resetEventCache();
chunkParser.setParserFilter(c.getFilter());
chunkParser.updateEventParsers();
- clearLastDispatch();
+ c.clearDispatchCache();
if (ordered) {
- awaitnewEvent = processOrdered(awaitnewEvent);
+ awaitnewEvent = processOrdered(c, awaitnewEvent);
} else {
- awaitnewEvent = processUnordered(awaitnewEvent);
+ awaitnewEvent = processUnordered(c, awaitnewEvent);
}
if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
close();
@@ -135,7 +135,7 @@
}
}
- private boolean processOrdered(boolean awaitNewEvents) throws IOException {
+ private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[100_000];
}
@@ -164,18 +164,18 @@
Arrays.sort(sortedList, 0, index, END_TIME);
}
for (int i = 0; i < index; i++) {
- dispatch(sortedList[i]);
+ dispatch(c, sortedList[i]);
}
return awaitNewEvents;
}
- private boolean processUnordered(boolean awaitNewEvents) throws IOException {
+ private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
while (true) {
RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
if (e == null) {
return true;
} else {
- dispatch(e);
+ dispatch(c, e);
}
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -43,7 +43,7 @@
private ChunkParser chunkParser;
private RecordedEvent[] sortedList;
- public EventFileStream(AccessControlContext acc, Path path) throws IOException {
+ EventFileStream(AccessControlContext acc, Path path) throws IOException {
super(acc, false);
Objects.requireNonNull(path);
this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
@@ -71,7 +71,7 @@
}
@Override
- public void process() throws IOException {
+ protected void process() throws IOException {
StreamConfiguration c = configuration;
long start = 0;
long end = Long.MAX_VALUE;
@@ -98,11 +98,11 @@
chunkParser.resetEventCache();
chunkParser.setParserFilter(c.getFiler());
chunkParser.updateEventParsers();
- clearLastDispatch();
+ c.clearDispatchCache();
if (ordered) {
- processOrdered();
+ processOrdered(c);
} else {
- processUnordered();
+ processUnordered(c);
}
if (chunkParser.isLastChunk()) {
return;
@@ -111,7 +111,7 @@
}
}
- private void processOrdered() throws IOException {
+ private void processOrdered(StreamConfiguration c) throws IOException {
if (sortedList == null) {
sortedList = new RecordedEvent[10_000];
}
@@ -122,7 +122,7 @@
if (event == null) {
Arrays.sort(sortedList, 0, index, END_TIME);
for (int i = 0; i < index; i++) {
- dispatch(sortedList[i]);
+ dispatch(c, sortedList[i]);
}
return;
}
@@ -135,13 +135,13 @@
}
}
- private void processUnordered() throws IOException {
+ private void processUnordered(StreamConfiguration c) throws IOException {
while (!isClosed()) {
RecordedEvent event = chunkParser.readEvent();
if (event == null) {
return;
}
- dispatch(event);
+ dispatch(c, event);
}
}
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java Thu Sep 05 16:46:50 2019 +0200
@@ -50,9 +50,10 @@
private final RecordedEvent unorderedEvent;
private final ObjectContext objectContext;
+ private RecordedEvent[] cached;
+ private int cacheIndex;
+
private boolean enabled = true;
- private RecordedEvent[] eventCache;
- private int index;
private boolean ordered;
private long filterStart;
private long filterEnd = Long.MAX_VALUE;
@@ -72,17 +73,17 @@
private RecordedEvent cachedEvent() {
if (ordered) {
- if (index == eventCache.length) {
- RecordedEvent[] cache = eventCache;
- eventCache = new RecordedEvent[eventCache.length * 2];
- System.arraycopy(cache, 0, eventCache, 0, cache.length);
+ if (cacheIndex == cached.length) {
+ RecordedEvent[] old = cached;
+ cached = new RecordedEvent[cached.length * 2];
+ System.arraycopy(old, 0, cached, 0, old.length);
}
- RecordedEvent event = eventCache[index];
+ RecordedEvent event = cached[cacheIndex];
if (event == null) {
event = new RecordedEvent(objectContext, new Object[length], 0L, 0L);
- eventCache[index] = event;
+ cached[cacheIndex] = event;
}
- index++;
+ cacheIndex++;
return event;
} else {
return unorderedEvent;
@@ -131,7 +132,7 @@
}
}
- if (eventCache != null) {
+ if (cached != null) {
RecordedEvent event = cachedEvent();
event.startTimeTicks = startTicks;
event.endTimeTicks = endTicks;
@@ -155,11 +156,11 @@
}
public void resetCache() {
- index = 0;
+ cacheIndex = 0;
}
public boolean hasReuse() {
- return eventCache != null;
+ return cached != null;
}
public void setReuse(boolean reuse) {
@@ -167,10 +168,10 @@
return;
}
if (reuse) {
- eventCache = new RecordedEvent[2];
- index = 0;
+ cached = new RecordedEvent[2];
+ cacheIndex = 0;
} else {
- eventCache = null;
+ cached = null;
}
}
@@ -187,7 +188,6 @@
return;
}
this.ordered = ordered;
- this.index = 0;
+ this.cacheIndex = 0;
}
-
}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -173,10 +173,11 @@
*
* @return {@code true} if the action was removed, {@code false} otherwise
*
- * @see #onClose(Runnable)
- * @see #onFlush(Runnable)
* @see #onEvent(Consumer)
* @see #onEvent(String, Consumer)
+ * @see #onFlush(Runnable)
+ * @see #onClose(Runnable)
+ * @see #onError(Consumer)
*/
boolean remove(Object action);
@@ -252,16 +253,21 @@
*
* @param timeout the maximum time to wait, not {@code null}
*
+ * @throws IllegalArgumentException if timeout is negative
+ * @throws InterruptedException
+ *
* @see #start()
* @see #startAsync()
*/
- void awaitTermination(Duration timeout);
+ void awaitTermination(Duration timeout) throws InterruptedException;
/**
* Blocks the current thread until the stream is finished or closed.
*
+ * @throws InterruptedException
+ *
* @see #start()
* @see #startAsync()
*/
- void awaitTermination();
+ void awaitTermination() throws InterruptedException;
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ObjectContext.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ObjectContext.java Thu Sep 05 16:46:50 2019 +0200
@@ -45,19 +45,19 @@
this.timeConverter = timeConverter;
}
- private ObjectContext(ObjectContext parent, ValueDescriptor desc) {
+ private ObjectContext(ObjectContext parent, ValueDescriptor descriptor) {
this.eventType = parent.eventType;
this.contextLookup = parent.contextLookup;
this.timeConverter = parent.timeConverter;
- this.fields = desc.getFields();
+ this.fields = descriptor.getFields();
}
- public ObjectContext getInstance(ValueDescriptor desc) {
- ObjectContext h = contextLookup.get(desc);
- if (h == null) {
- h = new ObjectContext(this, desc);
- contextLookup.put(desc, h);
+ public ObjectContext getInstance(ValueDescriptor descriptor) {
+ ObjectContext context = contextLookup.get(descriptor);
+ if (context == null) {
+ context = new ObjectContext(this, descriptor);
+ contextLookup.put(descriptor, context);
}
- return h;
+ return context;
}
}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Sep 05 16:46:50 2019 +0200
@@ -347,12 +347,12 @@
}
@Override
- public void awaitTermination(Duration timeout) {
+ public void awaitTermination(Duration timeout) throws InterruptedException {
directoryStream.awaitTermination(timeout);
}
@Override
- public void awaitTermination() {
+ public void awaitTermination() throws InterruptedException {
directoryStream.awaitTermination();
}
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java Thu Sep 05 16:46:50 2019 +0200
@@ -0,0 +1,268 @@
+package jdk.jfr.consumer;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.consumer.AbstractEventStream.EventDispatcher;
+import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.InternalEventFilter;
+
+final class StreamConfiguration {
+ private static final Runnable[] NO_ACTIONS = new Runnable[0];
+
+ Consumer<?>[] errorActions = new Consumer<?>[0];
+ private Runnable[] flushActions = NO_ACTIONS;
+ private Runnable[] closeActions = NO_ACTIONS;
+ private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
+ private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+ LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
+ private boolean changedConfiguration = false;
+ private boolean closed = false;
+ private boolean reuse = true;
+ private boolean ordered = true;
+ private Instant startTime = null;
+ private Instant endTime = null;
+ private boolean started = false;
+ private long startNanos = 0;
+ private long endNanos = Long.MAX_VALUE;
+
+ // Cache the last event type and dispatch.
+ EventType cacheEventType;
+ EventDispatcher[] cacheDispatchers;
+
+
+ public StreamConfiguration(StreamConfiguration configuration) {
+ this.flushActions = configuration.flushActions;
+ this.closeActions = configuration.closeActions;
+ this.errorActions = configuration.errorActions;
+ this.dispatchers = configuration.dispatchers;
+ this.eventFilter = configuration.eventFilter;
+ this.closed = configuration.closed;
+ this.reuse = configuration.reuse;
+ this.ordered = configuration.ordered;
+ this.startTime = configuration.startTime;
+ this.endTime = configuration.endTime;
+ this.started = configuration.started;
+ this.startNanos = configuration.startNanos;
+ this.endNanos = configuration.endNanos;
+ this.dispatcherLookup = configuration.dispatcherLookup;
+ }
+
+ public StreamConfiguration() {
+ }
+
+ public StreamConfiguration remove(Object action) {
+ flushActions = remove(flushActions, action);
+ closeActions = remove(closeActions, action);
+ errorActions = remove(errorActions, action);
+ dispatchers = removeDispatch(dispatchers, action);
+ return this;
+ }
+
+ public StreamConfiguration addDispatcher(EventDispatcher e) {
+ dispatchers = add(dispatchers, e);
+ eventFilter = buildFilter(dispatchers);
+ dispatcherLookup = new LongMap<>();
+ return this;
+ }
+
+ public StreamConfiguration addFlushAction(Runnable action) {
+ flushActions = add(flushActions, action);
+ return this;
+ }
+
+ public StreamConfiguration addCloseAction(Runnable action) {
+ closeActions = add(closeActions, action);
+ return this;
+ }
+
+ public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
+ errorActions = add(errorActions, action);
+ return this;
+ }
+
+ public StreamConfiguration setClosed(boolean closed) {
+ this.closed = closed;
+ changedConfiguration = true;
+ return this;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public Runnable[] getCloseActions() {
+ return closeActions;
+ }
+
+ public Runnable[] getFlushActions() {
+ return flushActions;
+ }
+
+ private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
+ List<EventDispatcher> list = new ArrayList<>(array.length);
+ boolean modified = false;
+ for (int i = 0; i < array.length; i++) {
+ if (array[i].action != action) {
+ list.add(array[i]);
+ } else {
+ modified = true;
+ }
+ }
+ EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
+ if (modified) {
+ eventFilter = buildFilter(result);
+ dispatcherLookup = new LongMap<>();
+ changedConfiguration = true;
+ }
+ return result;
+ }
+
+ private <T> T[] remove(T[] array, Object action) {
+ List<T> list = new ArrayList<>(array.length);
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] != action) {
+ list.add(array[i]);
+ } else {
+ changedConfiguration = true;
+ }
+ }
+ return list.toArray(array);
+ }
+
+ private <T> T[] add(T[] array, T object) {
+ List<T> list = new ArrayList<>(Arrays.asList(array));
+ list.add(object);
+ changedConfiguration = true;
+ return list.toArray(array);
+ }
+
+ private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+ InternalEventFilter ef = new InternalEventFilter();
+ for (EventDispatcher ed : dispatchers) {
+ String name = ed.eventName;
+ if (name == null) {
+ return InternalEventFilter.ACCEPT_ALL;
+ }
+ ef.setThreshold(name, 0);
+ }
+ return ef;
+ }
+
+ public StreamConfiguration setReuse(boolean reuse) {
+ this.reuse = reuse;
+ changedConfiguration = true;
+ return this;
+ }
+
+ public StreamConfiguration setOrdered(boolean ordered) {
+ this.ordered = ordered;
+ changedConfiguration = true;
+ return this;
+ }
+
+ public StreamConfiguration setEndTime(Instant endTime) {
+ this.endTime = endTime;
+ this.endNanos = Utils.timeToNanos(endTime);
+ changedConfiguration = true;
+ return this;
+ }
+
+ public StreamConfiguration setStartTime(Instant startTime) {
+ this.startTime = startTime;
+ this.startNanos = Utils.timeToNanos(startTime);
+ changedConfiguration = true;
+ return this;
+ }
+
+ public Instant getStartTime() {
+ return startTime;
+ }
+
+ public Object getEndTime() {
+ return endTime;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public StreamConfiguration setStartNanos(long startNanos) {
+ this.startNanos = startNanos;
+ changedConfiguration = true;
+ return this;
+ }
+
+ public void setStarted(boolean started) {
+ this.started = started;
+ changedConfiguration = true;
+ }
+
+ public boolean hasChanged() {
+ return changedConfiguration;
+ }
+
+ public boolean getReuse() {
+ return reuse;
+ }
+
+ public boolean getOrdered() {
+ return ordered;
+ }
+
+ public InternalEventFilter getFiler() {
+ return eventFilter;
+ }
+
+ public long getStartNanos() {
+ return startNanos;
+ }
+
+ public long getEndNanos() {
+ return endNanos;
+ }
+
+ public InternalEventFilter getFilter() {
+ return eventFilter;
+ }
+
+ public void clearDispatchCache() {
+ cacheDispatchers = null;
+ cacheEventType = null;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Runnable flush : flushActions) {
+ sb.append("Flush Action: ").append(flush).append("\n");
+ }
+ for (Runnable close : closeActions) {
+ sb.append("Close Action: " + close + "\n");
+ }
+ for (Consumer<?> error : errorActions) {
+ sb.append("Error Action: " + error + "\n");
+ }
+ for (EventDispatcher dispatcher : dispatchers) {
+ sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
+ }
+ sb.append("Closed: ").append(closed).append("\n");
+ sb.append("Reuse: ").append(reuse).append("\n");
+ sb.append("Ordered: ").append(ordered).append("\n");
+ sb.append("Started: ").append(started).append("\n");
+ sb.append("Start Time: ").append(startTime).append("\n");
+ sb.append("Start Nanos: ").append(startNanos).append("\n");
+ sb.append("End Time: ").append(endTime).append("\n");
+ sb.append("End Nanos: ").append(endNanos).append("\n");
+ return sb.toString();
+ }
+
+ EventDispatcher[] getDispatchers() {
+ return dispatchers;
+ }
+}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/StringParser.java Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StringParser.java Thu Sep 05 16:46:50 2019 +0200
@@ -36,7 +36,6 @@
private final static Charset UTF8 = Charset.forName("UTF-8");
private final static Charset LATIN1 = Charset.forName("ISO-8859-1");
-
final static class CharsetParser extends Parser {
private final Charset charset;
private int lastSize;
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestAwaitTermination.java Tue Sep 03 22:54:46 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestAwaitTermination.java Thu Sep 05 16:46:50 2019 +0200
@@ -50,7 +50,11 @@
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
var c = CompletableFuture.runAsync(() -> {
- r.awaitTermination();
+ try {
+ r.awaitTermination();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
});
r.close();
c.get();
@@ -61,7 +65,11 @@
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
var c = CompletableFuture.runAsync(() -> {
- r.awaitTermination(Duration.ofMillis(10));
+ try {
+ r.awaitTermination(Duration.ofMillis(10));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
});
c.get();
r.close();