diff -r 4cab5edc2950 -r 5d043a159d5c src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 17 15:53:21 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java Fri May 17 16:02:27 2019 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.StringJoiner; import jdk.jfr.EventType; import jdk.jfr.internal.LogLevel; @@ -35,7 +36,9 @@ import jdk.jfr.internal.Logger; import jdk.jfr.internal.MetadataDescriptor; import jdk.jfr.internal.Type; +import jdk.jfr.internal.Utils; import jdk.jfr.internal.consumer.ChunkHeader; +import jdk.jfr.internal.consumer.Parser; import jdk.jfr.internal.consumer.RecordingInput; /** @@ -45,38 +48,127 @@ final class ChunkParser { private static final long CONSTANT_POOL_TYPE_ID = 1; private final RecordingInput input; - private final LongMap parsers; private final ChunkHeader chunkHeader; - private final long absoluteChunkEnd; private final MetadataDescriptor metadata; - private final LongMap typeMap; private final TimeConverter timeConverter; + private final MetadataDescriptor previousMetadata; + private final long pollInterval; + private final LongMap constantLookups; + + private LongMap typeMap; + private LongMap parsers; + private boolean chunkFinished; + private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; public ChunkParser(RecordingInput input) throws IOException { - this(new ChunkHeader(input)); + this(new ChunkHeader(input), null, 500); } - private ChunkParser(ChunkHeader header) throws IOException { + private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException { this.input = header.getInput(); this.chunkHeader = header; - this.metadata = header.readMetadata(); - this.absoluteChunkEnd = header.getEnd(); + if (previous == null) { + this.pollInterval = 500; + this.constantLookups = new LongMap<>(); + this.previousMetadata = null; + } else { + this.constantLookups = previous.constantLookups; + this.previousMetadata = previous.metadata; + this.pollInterval = previous.pollInterval; + } + this.metadata = header.readMetadata(previousMetadata); this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset()); - - ParserFactory factory = new ParserFactory(metadata, timeConverter); - LongMap constantPools = factory.getConstantPools(); - parsers = factory.getParsers(); - typeMap = factory.getTypeMap(); - - fillConstantPools(parsers, constantPools); - constantPools.forEach(ConstantMap::setIsResolving); - constantPools.forEach(ConstantMap::resolve); - constantPools.forEach(ConstantMap::setResolved); + if (metadata != previousMetadata) { + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + } else { + parsers = previous.parsers; + typeMap = previous.typeMap; + } + constantLookups.forEach(c -> c.newPool()); + fillConstantPools(0); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); input.position(chunkHeader.getEventStart()); } + public void setParserFilter(InternalEventFilter filter) { + // Disable low level filter, since it doesn't work + // when a psrser is shared + // this.eventFilter = filter; + // updateParserFilters(); + } + + public InternalEventFilter getEventFilter() { + return this.eventFilter; + } + + private void updateParserFilters() { + parsers.forEach(p -> { + if (p instanceof EventParser) { + EventParser ep = (EventParser) p; + long threshold = eventFilter.getThreshold(ep.getEventType().getName()); + if (threshold >= 0) { + ep.setEnabled(true); + ep.setThreshold(timeConverter.convertDurationNanos(threshold)); + } else { + ep.setThreshold(-1L); + } + } + }); + } + + /** + * Reads an event and returns null when segment or chunk ends. + */ + public RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException { + long absoluteChunkEnd = chunkHeader.getEnd(); + while (true) { + RecordedEvent event = readEvent(); + if (event != null) { + return event; + } + if (!awaitNewEvents) { + return null; + } + long lastValid = absoluteChunkEnd; + long metadataPoistion = chunkHeader.getMetataPosition(); + long contantPosition = chunkHeader.getConstantPoolPosition(); + chunkFinished = awaitUpdatedHeader(absoluteChunkEnd); + if (chunkFinished) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); + return null; + } + absoluteChunkEnd = chunkHeader.getEnd(); + // Read metadata and constant pools for the next segment + if (chunkHeader.getMetataPosition() != metadataPoistion) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); + MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + updateParserFilters(); + } + if (contantPosition != chunkHeader.getConstantPoolPosition()) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); + constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); + fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); + } + input.position(lastValid); + } + } + + /** + * Reads an event and returns null when the chunk ends + */ public RecordedEvent readEvent() throws IOException { + long absoluteChunkEnd = chunkHeader.getEnd(); while (input.position() < absoluteChunkEnd) { long pos = input.position(); int size = input.readInt(); @@ -84,10 +176,16 @@ throw new IOException("Event can't have zero size"); } long typeId = input.readLong(); - if (typeId > CONSTANT_POOL_TYPE_ID) { // also skips metadata (id=0) - Parser ep = parsers.get(typeId); - if (ep instanceof EventParser) { - return (RecordedEvent) ep.parse(input); + // Skip metadata and constant pool events (id = 0, id = 1) + if (typeId > CONSTANT_POOL_TYPE_ID) { + Parser p = parsers.get(typeId); + if (p instanceof EventParser) { + EventParser ep = (EventParser) p; + RecordedEvent event = ep.parse(input); + if (event != null) { + input.position(pos + size); + return event; + } } } input.position(pos + size); @@ -95,62 +193,132 @@ return null; } - private void fillConstantPools(LongMap typeParser, LongMap constantPools) throws IOException { - long nextCP = chunkHeader.getAbsoluteChunkStart(); - long deltaToNext = chunkHeader.getConstantPoolPosition(); - while (deltaToNext != 0) { - nextCP += deltaToNext; - input.position(nextCP); - final long position = nextCP; + private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); + while (true) { + chunkHeader.refresh(); + if (absoluteChunkEnd != chunkHeader.getEnd()) { + return false; + } + if (chunkHeader.isFinished()) { + return true; + } + Utils.takeNap(pollInterval); + } + } + + private void fillConstantPools(long abortCP) throws IOException { + long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart(); + long lastCP = -1; + long delta = -1; + boolean log = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE); + while (thisCP != abortCP && delta != 0) { + input.position(thisCP); + lastCP = thisCP; int size = input.readInt(); // size long typeId = input.readLong(); if (typeId != CONSTANT_POOL_TYPE_ID) { - throw new IOException("Expected check point event (id = 1) at position " + nextCP + ", but found type id = " + typeId); + throw new IOException("Expected check point event (id = 1) at position " + lastCP + ", but found type id = " + typeId); } input.readLong(); // timestamp input.readLong(); // duration - deltaToNext = input.readLong(); - final long delta = deltaToNext; + delta = input.readLong(); + thisCP += delta; boolean flush = input.readBoolean(); int poolCount = input.readInt(); + final long logLastCP = lastCP; + final long logDelta = delta; Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> { - return "New constant pool: startPosition=" + position + ", size=" + size + ", deltaToNext=" + delta + ", flush=" + flush + ", poolCount=" + poolCount; + return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount; }); - for (int i = 0; i < poolCount; i++) { long id = input.readLong(); // type id - ConstantMap pool = constantPools.get(id); + ConstantLookup lookup = constantLookups.get(id); Type type = typeMap.get(id); - if (pool == null) { + if (lookup == null) { Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found constant pool(" + id + ") that is never used"); if (type == null) { - throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]"); + throw new IOException( + "Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]"); } - pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName()); - constantPools.put(type.getId(), pool); + ConstantMap pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName()); + constantLookups.put(type.getId(), new ConstantLookup(pool, type)); } - Parser parser = typeParser.get(id); + Parser parser = parsers.get(id); if (parser == null) { throw new IOException("Could not find constant pool type with id = " + id); } try { int count = input.readInt(); - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> "Constant: " + getName(id) + "[" + count + "]"); + if (count == 0) { + throw new InternalError("Pool " + type.getName() + " must contain at least one element "); + } + if (log) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName()); + } for (int j = 0; j < count; j++) { long key = input.readLong(); - Object value = parser.parse(input); - pool.put(key, value); +// Object resolved = lookup.getCurrent(key); +// Disable cache Object resolved = lookup.getResolved(key); +// if (resolved == null) { + Object v = parser.parse(input); + logConstant(key, v, false); + lookup.getLatestPool().put(key, v); +// } else { +// parser.skip(input); +// logConstant(key, resolved, true); +// Disable cache lookup.getLatestPool().putResolved(key, resolved); +// } } } catch (Exception e) { - throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]", e); + throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]", + e); } } - if (input.position() != nextCP + size) { + if (input.position() != lastCP + size) { throw new IOException("Size of check point event doesn't match content"); } } } + private void logConstant(long key, Object v, boolean preresolved) { + if (!Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE)) { + return; + } + String valueText; + if (v.getClass().isArray()) { + Object[] array = (Object[]) v; + StringJoiner sj = new StringJoiner(", ", "{", "}"); + for (int i = 0; i < array.length; i++) { + sj.add(textify(array[i])); + } + valueText = sj.toString(); + } else { + valueText = textify(v); + } + String suffix = preresolved ? " (presolved)" :""; + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant: " + key + " = " + valueText + suffix); + } + + private String textify(Object o) { + if (o == null) { // should not happen + return "null"; + } + if (o instanceof String) { + return "\"" + String.valueOf(o) + "\""; + } + if (o instanceof RecordedObject) { + return o.getClass().getName(); + } + if (o.getClass().isArray()) { + Object[] array = (Object[]) o; + if (array.length > 0) { + return textify(array[0]) + "[]"; // can it be recursive? + } + } + return String.valueOf(o); + } + private String getName(long id) { Type type = typeMap.get(id); return type == null ? ("unknown(" + id + ")") : type.getName(); @@ -164,11 +332,15 @@ return metadata.getEventTypes(); } - public boolean isLastChunk() { + public boolean isLastChunk() throws IOException { return chunkHeader.isLastChunk(); } public ChunkParser nextChunkParser() throws IOException { - return new ChunkParser(chunkHeader.nextHeader()); + return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval); + } + + public boolean isChunkFinished() { + return chunkFinished; } }