diff -r 2c3cc4b01880 -r c16ac7a2eba4 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java Wed Oct 30 19:43:52 2019 +0100 @@ -0,0 +1,451 @@ +/* + * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.internal.consumer; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.StringJoiner; + +import jdk.jfr.EventType; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordedObject; +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; +import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.MetadataDescriptor; +import jdk.jfr.internal.Type; +import jdk.jfr.internal.Utils; + +/** + * Parses a chunk. + * + */ +public final class ChunkParser { + + static final class ParserConfiguration { + private final boolean reuse; + private final boolean ordered; + private final ParserFilter eventFilter; + + long filterStart; + long filterEnd; + + ParserConfiguration(long filterStart, long filterEnd, boolean reuse, boolean ordered, ParserFilter filter) { + this.filterStart = filterStart; + this.filterEnd = filterEnd; + this.reuse = reuse; + this.ordered = ordered; + this.eventFilter = filter; + } + + public ParserConfiguration() { + this(0, Long.MAX_VALUE, false, false, ParserFilter.ACCEPT_ALL); + } + + public boolean isOrdered() { + return ordered; + } + } + + private enum CheckPointType { + // Checkpoint that finishes a flush segment + FLUSH(1), + // Checkpoint contains chunk header information in the first pool + CHUNK_HEADER(2), + // Checkpoint contains only statics that will not change from chunk to chunk + STATICS(4), + // Checkpoint contains thread related information + THREAD(8); + private final int mask; + private CheckPointType(int mask) { + this.mask = mask; + } + + private boolean is(int flags) { + return (mask & flags) != 0; + } + } + + private static final long CONSTANT_POOL_TYPE_ID = 1; + private static final String CHUNKHEADER = "jdk.types.ChunkHeader"; + private final RecordingInput input; + private final ChunkHeader chunkHeader; + private final MetadataDescriptor metadata; + private final TimeConverter timeConverter; + private final MetadataDescriptor previousMetadata; + private final LongMap constantLookups; + + private LongMap typeMap; + private LongMap parsers; + private boolean chunkFinished; + + private Runnable flushOperation; + private ParserConfiguration configuration; + + public ChunkParser(RecordingInput input) throws IOException { + this(input, new ParserConfiguration()); + } + + ChunkParser(RecordingInput input, ParserConfiguration pc) throws IOException { + this(new ChunkHeader(input), null, pc); + } + + private ChunkParser(ChunkParser previous) throws IOException { + this(new ChunkHeader(previous.input), previous, new ParserConfiguration()); + } + + private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc) throws IOException { + this.configuration = pc; + this.input = header.getInput(); + this.chunkHeader = header; + if (previous == null) { + this.constantLookups = new LongMap<>(); + this.previousMetadata = null; + } else { + this.constantLookups = previous.constantLookups; + this.previousMetadata = previous.metadata; + this.configuration = previous.configuration; + } + this.metadata = header.readMetadata(previousMetadata); + this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset()); + if (metadata != previousMetadata) { + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + updateConfiguration(); + } else { + parsers = previous.parsers; + typeMap = previous.typeMap; + } + constantLookups.forEach(c -> c.newPool()); + fillConstantPools(0); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); + + input.position(chunkHeader.getEventStart()); + } + + public ChunkParser nextChunkParser() throws IOException { + return new ChunkParser(chunkHeader.nextHeader(), this, configuration); + } + + private void updateConfiguration() { + updateConfiguration(configuration, false); + } + + void updateConfiguration(ParserConfiguration configuration, boolean resetEventCache) { + this.configuration = configuration; + parsers.forEach(p -> { + if (p instanceof EventParser) { + EventParser ep = (EventParser) p; + if (resetEventCache) { + ep.resetCache(); + } + String name = ep.getEventType().getName(); + ep.setOrdered(configuration.ordered); + ep.setReuse(configuration.reuse); + ep.setFilterStart(configuration.filterStart); + ep.setFilterEnd(configuration.filterEnd); + long threshold = configuration.eventFilter.getThreshold(name); + if (threshold >= 0) { + ep.setEnabled(true); + ep.setThresholdNanos(threshold); + } else { + ep.setEnabled(false); + ep.setThresholdNanos(Long.MAX_VALUE); + } + } + }); + } + + /** + * Reads an event and returns null when segment or chunk ends. + * + * @param awaitNewEvents wait for new data. + */ + RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException { + long absoluteChunkEnd = chunkHeader.getEnd(); + while (true) { + RecordedEvent event = readEvent(); + if (event != null) { + return event; + } + if (!awaitNewEvents) { + return null; + } + long lastValid = absoluteChunkEnd; + long metadataPoistion = chunkHeader.getMetataPosition(); + long contantPosition = chunkHeader.getConstantPoolPosition(); + chunkFinished = awaitUpdatedHeader(absoluteChunkEnd); + if (chunkFinished) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); + return null; + } + absoluteChunkEnd = chunkHeader.getEnd(); + // Read metadata and constant pools for the next segment + if (chunkHeader.getMetataPosition() != metadataPoistion) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); + MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + updateConfiguration();; + } + if (contantPosition != chunkHeader.getConstantPoolPosition()) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); + constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); + fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); + } + input.position(lastValid); + } + } + + /** + * Reads an event and returns null when the chunk ends + */ + public RecordedEvent readEvent() throws IOException { + long absoluteChunkEnd = chunkHeader.getEnd(); + while (input.position() < absoluteChunkEnd) { + long pos = input.position(); + int size = input.readInt(); + if (size == 0) { + throw new IOException("Event can't have zero size"); + } + long typeId = input.readLong(); + Parser p = parsers.get(typeId); + if (p instanceof EventParser) { + // Fast path + EventParser ep = (EventParser) p; + RecordedEvent event = ep.parse(input); + if (event != null) { + input.position(pos + size); + return event; + } + // Not accepted by filter + } else { + if (typeId == 1) { // checkpoint event + if (flushOperation != null) { + parseCheckpoint(); + } + } else { + if (typeId != 0) { // Not metadata event + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Unknown event type " + typeId); + } + } + } + input.position(pos + size); + } + return null; + } + + private void parseCheckpoint() throws IOException { + // Content has been parsed previously. This + // is to trigger flush + input.readLong(); // timestamp + input.readLong(); // duration + input.readLong(); // delta + byte typeFlags = input.readByte(); + if (CheckPointType.FLUSH.is(typeFlags)) { + flushOperation.run(); + } + } + + private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException { + if (Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO)) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); + } + while (true) { + chunkHeader.refresh(); + if (absoluteChunkEnd != chunkHeader.getEnd()) { + return false; + } + if (chunkHeader.isFinished()) { + return true; + } + Utils.waitFlush(1000); + } + } + + private void fillConstantPools(long abortCP) throws IOException { + long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart(); + long lastCP = -1; + long delta = -1; + boolean logTrace = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE); + while (thisCP != abortCP && delta != 0) { + input.position(thisCP); + lastCP = thisCP; + int size = input.readInt(); // size + long typeId = input.readLong(); + if (typeId != CONSTANT_POOL_TYPE_ID) { + throw new IOException("Expected check point event (id = 1) at position " + lastCP + ", but found type id = " + typeId); + } + input.readLong(); // timestamp + input.readLong(); // duration + delta = input.readLong(); + thisCP += delta; + boolean flush = input.readBoolean(); + int poolCount = input.readInt(); + final long logLastCP = lastCP; + final long logDelta = delta; + if (logTrace) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> { + return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount; + }); + } + for (int i = 0; i < poolCount; i++) { + long id = input.readLong(); // type id + ConstantLookup lookup = constantLookups.get(id); + Type type = typeMap.get(id); + if (lookup == null) { + if (type == null) { + throw new IOException( + "Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]"); + } + if (type.getName() != CHUNKHEADER) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found constant pool(" + id + ") that is never used"); + } + ConstantMap pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName()); + lookup = new ConstantLookup(pool, type); + constantLookups.put(type.getId(), lookup); + } + Parser parser = parsers.get(id); + if (parser == null) { + throw new IOException("Could not find constant pool type with id = " + id); + } + try { + int count = input.readInt(); + if (count == 0) { + throw new InternalError("Pool " + type.getName() + " must contain at least one element "); + } + if (logTrace) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName()); + } + for (int j = 0; j < count; j++) { + long key = input.readLong(); + Object resolved = lookup.getPreviousResolved(key); + if (resolved == null) { + Object v = parser.parse(input); + logConstant(key, v, false); + lookup.getLatestPool().put(key, v); + } else { + parser.skip(input); + logConstant(key, resolved, true); + lookup.getLatestPool().putResolved(key, resolved); + } + } + } catch (Exception e) { + throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]", + e); + } + } + if (input.position() != lastCP + size) { + throw new IOException("Size of check point event doesn't match content"); + } + } + } + + private void logConstant(long key, Object v, boolean preresolved) { + if (!Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE)) { + return; + } + String valueText; + if (v.getClass().isArray()) { + Object[] array = (Object[]) v; + StringJoiner sj = new StringJoiner(", ", "{", "}"); + for (int i = 0; i < array.length; i++) { + sj.add(textify(array[i])); + } + valueText = sj.toString(); + } else { + valueText = textify(v); + } + String suffix = preresolved ? " (presolved)" :""; + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant: " + key + " = " + valueText + suffix); + } + + private String textify(Object o) { + if (o == null) { // should not happen + return "null"; + } + if (o instanceof String) { + return "\"" + String.valueOf(o) + "\""; + } + if (o instanceof RecordedObject) { + return o.getClass().getName(); + } + if (o.getClass().isArray()) { + Object[] array = (Object[]) o; + if (array.length > 0) { + return textify(array[0]) + "[]"; // can it be recursive? + } + } + return String.valueOf(o); + } + + private String getName(long id) { + Type type = typeMap.get(id); + return type == null ? ("unknown(" + id + ")") : type.getName(); + } + + public Collection getTypes() { + return metadata.getTypes(); + } + + public List getEventTypes() { + return metadata.getEventTypes(); + } + + public boolean isLastChunk() throws IOException { + return chunkHeader.isLastChunk(); + } + + ChunkParser newChunkParser() throws IOException { + return new ChunkParser(this); + } + + public boolean isChunkFinished() { + return chunkFinished; + } + + public void setFlushOperation(Runnable flushOperation) { + this.flushOperation = flushOperation; + } + + public long getChunkDuration() { + return chunkHeader.getDurationNanos(); + } + + public long getStartNanos() { + return chunkHeader.getStartNanos(); + } + +}