--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 17 15:53:21 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 17 16:02:27 2019 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.StringJoiner;
import jdk.jfr.EventType;
import jdk.jfr.internal.LogLevel;
@@ -35,7 +36,9 @@
import jdk.jfr.internal.Logger;
import jdk.jfr.internal.MetadataDescriptor;
import jdk.jfr.internal.Type;
+import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.Parser;
import jdk.jfr.internal.consumer.RecordingInput;
/**
@@ -45,38 +48,127 @@
final class ChunkParser {
private static final long CONSTANT_POOL_TYPE_ID = 1;
private final RecordingInput input;
- private final LongMap<Parser> parsers;
private final ChunkHeader chunkHeader;
- private final long absoluteChunkEnd;
private final MetadataDescriptor metadata;
- private final LongMap<Type> typeMap;
private final TimeConverter timeConverter;
+ private final MetadataDescriptor previousMetadata;
+ private final long pollInterval;
+ private final LongMap<ConstantLookup> constantLookups;
+
+ private LongMap<Type> typeMap;
+ private LongMap<Parser> parsers;
+ private boolean chunkFinished;
+ private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
public ChunkParser(RecordingInput input) throws IOException {
- this(new ChunkHeader(input));
+ this(new ChunkHeader(input), null, 500);
}
- private ChunkParser(ChunkHeader header) throws IOException {
+ private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
this.input = header.getInput();
this.chunkHeader = header;
- this.metadata = header.readMetadata();
- this.absoluteChunkEnd = header.getEnd();
+ if (previous == null) {
+ this.pollInterval = 500;
+ this.constantLookups = new LongMap<>();
+ this.previousMetadata = null;
+ } else {
+ this.constantLookups = previous.constantLookups;
+ this.previousMetadata = previous.metadata;
+ this.pollInterval = previous.pollInterval;
+ }
+ this.metadata = header.readMetadata(previousMetadata);
this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
-
- ParserFactory factory = new ParserFactory(metadata, timeConverter);
- LongMap<ConstantMap> constantPools = factory.getConstantPools();
- parsers = factory.getParsers();
- typeMap = factory.getTypeMap();
-
- fillConstantPools(parsers, constantPools);
- constantPools.forEach(ConstantMap::setIsResolving);
- constantPools.forEach(ConstantMap::resolve);
- constantPools.forEach(ConstantMap::setResolved);
+ if (metadata != previousMetadata) {
+ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
+ parsers = factory.getParsers();
+ typeMap = factory.getTypeMap();
+ } else {
+ parsers = previous.parsers;
+ typeMap = previous.typeMap;
+ }
+ constantLookups.forEach(c -> c.newPool());
+ fillConstantPools(0);
+ constantLookups.forEach(c -> c.getLatestPool().setResolving());
+ constantLookups.forEach(c -> c.getLatestPool().resolve());
+ constantLookups.forEach(c -> c.getLatestPool().setResolved());
input.position(chunkHeader.getEventStart());
}
+ public void setParserFilter(InternalEventFilter filter) {
+ // Disable low level filter, since it doesn't work
+ // when a psrser is shared
+ // this.eventFilter = filter;
+ // updateParserFilters();
+ }
+
+ public InternalEventFilter getEventFilter() {
+ return this.eventFilter;
+ }
+
+ private void updateParserFilters() {
+ parsers.forEach(p -> {
+ if (p instanceof EventParser) {
+ EventParser ep = (EventParser) p;
+ long threshold = eventFilter.getThreshold(ep.getEventType().getName());
+ if (threshold >= 0) {
+ ep.setEnabled(true);
+ ep.setThreshold(timeConverter.convertDurationNanos(threshold));
+ } else {
+ ep.setThreshold(-1L);
+ }
+ }
+ });
+ }
+
+ /**
+ * Reads an event and returns null when segment or chunk ends.
+ */
+ public RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException {
+ long absoluteChunkEnd = chunkHeader.getEnd();
+ while (true) {
+ RecordedEvent event = readEvent();
+ if (event != null) {
+ return event;
+ }
+ if (!awaitNewEvents) {
+ return null;
+ }
+ long lastValid = absoluteChunkEnd;
+ long metadataPoistion = chunkHeader.getMetataPosition();
+ long contantPosition = chunkHeader.getConstantPoolPosition();
+ chunkFinished = awaitUpdatedHeader(absoluteChunkEnd);
+ if (chunkFinished) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
+ return null;
+ }
+ absoluteChunkEnd = chunkHeader.getEnd();
+ // Read metadata and constant pools for the next segment
+ if (chunkHeader.getMetataPosition() != metadataPoistion) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
+ MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
+ ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
+ parsers = factory.getParsers();
+ typeMap = factory.getTypeMap();
+ updateParserFilters();
+ }
+ if (contantPosition != chunkHeader.getConstantPoolPosition()) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
+ constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
+ fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
+ constantLookups.forEach(c -> c.getLatestPool().setResolving());
+ constantLookups.forEach(c -> c.getLatestPool().resolve());
+ constantLookups.forEach(c -> c.getLatestPool().setResolved());
+ }
+ input.position(lastValid);
+ }
+ }
+
+ /**
+ * Reads an event and returns null when the chunk ends
+ */
public RecordedEvent readEvent() throws IOException {
+ long absoluteChunkEnd = chunkHeader.getEnd();
while (input.position() < absoluteChunkEnd) {
long pos = input.position();
int size = input.readInt();
@@ -84,10 +176,16 @@
throw new IOException("Event can't have zero size");
}
long typeId = input.readLong();
- if (typeId > CONSTANT_POOL_TYPE_ID) { // also skips metadata (id=0)
- Parser ep = parsers.get(typeId);
- if (ep instanceof EventParser) {
- return (RecordedEvent) ep.parse(input);
+ // Skip metadata and constant pool events (id = 0, id = 1)
+ if (typeId > CONSTANT_POOL_TYPE_ID) {
+ Parser p = parsers.get(typeId);
+ if (p instanceof EventParser) {
+ EventParser ep = (EventParser) p;
+ RecordedEvent event = ep.parse(input);
+ if (event != null) {
+ input.position(pos + size);
+ return event;
+ }
}
}
input.position(pos + size);
@@ -95,62 +193,132 @@
return null;
}
- private void fillConstantPools(LongMap<Parser> typeParser, LongMap<ConstantMap> constantPools) throws IOException {
- long nextCP = chunkHeader.getAbsoluteChunkStart();
- long deltaToNext = chunkHeader.getConstantPoolPosition();
- while (deltaToNext != 0) {
- nextCP += deltaToNext;
- input.position(nextCP);
- final long position = nextCP;
+ private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
+ while (true) {
+ chunkHeader.refresh();
+ if (absoluteChunkEnd != chunkHeader.getEnd()) {
+ return false;
+ }
+ if (chunkHeader.isFinished()) {
+ return true;
+ }
+ Utils.takeNap(pollInterval);
+ }
+ }
+
+ private void fillConstantPools(long abortCP) throws IOException {
+ long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart();
+ long lastCP = -1;
+ long delta = -1;
+ boolean log = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE);
+ while (thisCP != abortCP && delta != 0) {
+ input.position(thisCP);
+ lastCP = thisCP;
int size = input.readInt(); // size
long typeId = input.readLong();
if (typeId != CONSTANT_POOL_TYPE_ID) {
- throw new IOException("Expected check point event (id = 1) at position " + nextCP + ", but found type id = " + typeId);
+ throw new IOException("Expected check point event (id = 1) at position " + lastCP + ", but found type id = " + typeId);
}
input.readLong(); // timestamp
input.readLong(); // duration
- deltaToNext = input.readLong();
- final long delta = deltaToNext;
+ delta = input.readLong();
+ thisCP += delta;
boolean flush = input.readBoolean();
int poolCount = input.readInt();
+ final long logLastCP = lastCP;
+ final long logDelta = delta;
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> {
- return "New constant pool: startPosition=" + position + ", size=" + size + ", deltaToNext=" + delta + ", flush=" + flush + ", poolCount=" + poolCount;
+ return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount;
});
-
for (int i = 0; i < poolCount; i++) {
long id = input.readLong(); // type id
- ConstantMap pool = constantPools.get(id);
+ ConstantLookup lookup = constantLookups.get(id);
Type type = typeMap.get(id);
- if (pool == null) {
+ if (lookup == null) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found constant pool(" + id + ") that is never used");
if (type == null) {
- throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]");
+ throw new IOException(
+ "Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]");
}
- pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName());
- constantPools.put(type.getId(), pool);
+ ConstantMap pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName());
+ constantLookups.put(type.getId(), new ConstantLookup(pool, type));
}
- Parser parser = typeParser.get(id);
+ Parser parser = parsers.get(id);
if (parser == null) {
throw new IOException("Could not find constant pool type with id = " + id);
}
try {
int count = input.readInt();
- Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> "Constant: " + getName(id) + "[" + count + "]");
+ if (count == 0) {
+ throw new InternalError("Pool " + type.getName() + " must contain at least one element ");
+ }
+ if (log) {
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName());
+ }
for (int j = 0; j < count; j++) {
long key = input.readLong();
- Object value = parser.parse(input);
- pool.put(key, value);
+// Object resolved = lookup.getCurrent(key);
+// Disable cache Object resolved = lookup.getResolved(key);
+// if (resolved == null) {
+ Object v = parser.parse(input);
+ logConstant(key, v, false);
+ lookup.getLatestPool().put(key, v);
+// } else {
+// parser.skip(input);
+// logConstant(key, resolved, true);
+// Disable cache lookup.getLatestPool().putResolved(key, resolved);
+// }
}
} catch (Exception e) {
- throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]", e);
+ throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]",
+ e);
}
}
- if (input.position() != nextCP + size) {
+ if (input.position() != lastCP + size) {
throw new IOException("Size of check point event doesn't match content");
}
}
}
+ private void logConstant(long key, Object v, boolean preresolved) {
+ if (!Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE)) {
+ return;
+ }
+ String valueText;
+ if (v.getClass().isArray()) {
+ Object[] array = (Object[]) v;
+ StringJoiner sj = new StringJoiner(", ", "{", "}");
+ for (int i = 0; i < array.length; i++) {
+ sj.add(textify(array[i]));
+ }
+ valueText = sj.toString();
+ } else {
+ valueText = textify(v);
+ }
+ String suffix = preresolved ? " (presolved)" :"";
+ Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant: " + key + " = " + valueText + suffix);
+ }
+
+ private String textify(Object o) {
+ if (o == null) { // should not happen
+ return "null";
+ }
+ if (o instanceof String) {
+ return "\"" + String.valueOf(o) + "\"";
+ }
+ if (o instanceof RecordedObject) {
+ return o.getClass().getName();
+ }
+ if (o.getClass().isArray()) {
+ Object[] array = (Object[]) o;
+ if (array.length > 0) {
+ return textify(array[0]) + "[]"; // can it be recursive?
+ }
+ }
+ return String.valueOf(o);
+ }
+
private String getName(long id) {
Type type = typeMap.get(id);
return type == null ? ("unknown(" + id + ")") : type.getName();
@@ -164,11 +332,15 @@
return metadata.getEventTypes();
}
- public boolean isLastChunk() {
+ public boolean isLastChunk() throws IOException {
return chunkHeader.isLastChunk();
}
public ChunkParser nextChunkParser() throws IOException {
- return new ChunkParser(chunkHeader.nextHeader());
+ return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval);
+ }
+
+ public boolean isChunkFinished() {
+ return chunkFinished;
}
}