src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
branchJEP-349-branch
changeset 57360 5d043a159d5c
parent 52850 f527b24990d7
child 57372 50ca040843ea
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri May 17 15:53:21 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri May 17 16:02:27 2019 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.StringJoiner;
 
 import jdk.jfr.EventType;
 import jdk.jfr.internal.LogLevel;
@@ -35,7 +36,9 @@
 import jdk.jfr.internal.Logger;
 import jdk.jfr.internal.MetadataDescriptor;
 import jdk.jfr.internal.Type;
+import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.ChunkHeader;
+import jdk.jfr.internal.consumer.Parser;
 import jdk.jfr.internal.consumer.RecordingInput;
 
 /**
@@ -45,38 +48,127 @@
 final class ChunkParser {
     private static final long CONSTANT_POOL_TYPE_ID = 1;
     private final RecordingInput input;
-    private final LongMap<Parser> parsers;
     private final ChunkHeader chunkHeader;
-    private final long absoluteChunkEnd;
     private final MetadataDescriptor metadata;
-    private final LongMap<Type> typeMap;
     private final TimeConverter timeConverter;
+    private final MetadataDescriptor previousMetadata;
+    private final long pollInterval;
+    private final LongMap<ConstantLookup> constantLookups;
+
+    private LongMap<Type> typeMap;
+    private LongMap<Parser> parsers;
+    private boolean chunkFinished;
+    private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
 
     public ChunkParser(RecordingInput input) throws IOException {
-        this(new ChunkHeader(input));
+        this(new ChunkHeader(input), null, 500);
     }
 
-    private ChunkParser(ChunkHeader header) throws IOException {
+    private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
         this.input = header.getInput();
         this.chunkHeader = header;
-        this.metadata = header.readMetadata();
-        this.absoluteChunkEnd = header.getEnd();
+        if (previous == null) {
+            this.pollInterval = 500;
+            this.constantLookups = new LongMap<>();
+            this.previousMetadata = null;
+        } else {
+            this.constantLookups = previous.constantLookups;
+            this.previousMetadata = previous.metadata;
+            this.pollInterval = previous.pollInterval;
+        }
+        this.metadata = header.readMetadata(previousMetadata);
         this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
-
-        ParserFactory factory = new ParserFactory(metadata, timeConverter);
-        LongMap<ConstantMap> constantPools = factory.getConstantPools();
-        parsers = factory.getParsers();
-        typeMap = factory.getTypeMap();
-
-        fillConstantPools(parsers, constantPools);
-        constantPools.forEach(ConstantMap::setIsResolving);
-        constantPools.forEach(ConstantMap::resolve);
-        constantPools.forEach(ConstantMap::setResolved);
+        if (metadata != previousMetadata) {
+            ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
+            parsers = factory.getParsers();
+            typeMap = factory.getTypeMap();
+        } else {
+            parsers = previous.parsers;
+            typeMap = previous.typeMap;
+        }
+        constantLookups.forEach(c -> c.newPool());
+        fillConstantPools(0);
+        constantLookups.forEach(c -> c.getLatestPool().setResolving());
+        constantLookups.forEach(c -> c.getLatestPool().resolve());
+        constantLookups.forEach(c -> c.getLatestPool().setResolved());
 
         input.position(chunkHeader.getEventStart());
     }
 
+    public void setParserFilter(InternalEventFilter filter) {
+    // Disable low level filter, since it doesn't work
+    // when a psrser is shared
+    //    this.eventFilter = filter;
+    //    updateParserFilters();
+    }
+
+    public InternalEventFilter getEventFilter() {
+        return this.eventFilter;
+    }
+
+    private void updateParserFilters() {
+        parsers.forEach(p -> {
+            if (p instanceof EventParser) {
+                EventParser ep = (EventParser) p;
+                long threshold = eventFilter.getThreshold(ep.getEventType().getName());
+                if (threshold >= 0) {
+                    ep.setEnabled(true);
+                    ep.setThreshold(timeConverter.convertDurationNanos(threshold));
+                } else {
+                    ep.setThreshold(-1L);
+                }
+            }
+        });
+    }
+
+    /**
+     * Reads an event and returns null when segment or chunk ends.
+     */
+    public RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException {
+        long absoluteChunkEnd = chunkHeader.getEnd();
+        while (true) {
+            RecordedEvent event = readEvent();
+            if (event != null) {
+                return event;
+            }
+            if (!awaitNewEvents) {
+                return null;
+            }
+            long lastValid = absoluteChunkEnd;
+            long metadataPoistion = chunkHeader.getMetataPosition();
+            long contantPosition = chunkHeader.getConstantPoolPosition();
+            chunkFinished = awaitUpdatedHeader(absoluteChunkEnd);
+            if (chunkFinished) {
+                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
+                return null;
+            }
+            absoluteChunkEnd = chunkHeader.getEnd();
+            // Read metadata and constant pools for the next segment
+            if (chunkHeader.getMetataPosition() != metadataPoistion) {
+                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
+                MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
+                ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
+                parsers = factory.getParsers();
+                typeMap = factory.getTypeMap();
+                updateParserFilters();
+            }
+            if (contantPosition != chunkHeader.getConstantPoolPosition()) {
+                Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
+                constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
+                fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
+                constantLookups.forEach(c -> c.getLatestPool().setResolving());
+                constantLookups.forEach(c -> c.getLatestPool().resolve());
+                constantLookups.forEach(c -> c.getLatestPool().setResolved());
+            }
+            input.position(lastValid);
+        }
+    }
+
+    /**
+     * Reads an event and returns null when the chunk ends
+     */
     public RecordedEvent readEvent() throws IOException {
+        long absoluteChunkEnd = chunkHeader.getEnd();
         while (input.position() < absoluteChunkEnd) {
             long pos = input.position();
             int size = input.readInt();
@@ -84,10 +176,16 @@
                 throw new IOException("Event can't have zero size");
             }
             long typeId = input.readLong();
-            if (typeId > CONSTANT_POOL_TYPE_ID) { // also skips metadata (id=0)
-                Parser ep = parsers.get(typeId);
-                if (ep instanceof EventParser) {
-                    return (RecordedEvent) ep.parse(input);
+            // Skip metadata and constant pool events (id = 0, id = 1)
+            if (typeId > CONSTANT_POOL_TYPE_ID) {
+                Parser p = parsers.get(typeId);
+                if (p instanceof EventParser) {
+                    EventParser ep = (EventParser) p;
+                    RecordedEvent event = ep.parse(input);
+                    if (event != null) {
+                        input.position(pos + size);
+                        return event;
+                    }
                 }
             }
             input.position(pos + size);
@@ -95,62 +193,132 @@
         return null;
     }
 
-    private void fillConstantPools(LongMap<Parser> typeParser, LongMap<ConstantMap> constantPools) throws IOException {
-        long nextCP = chunkHeader.getAbsoluteChunkStart();
-        long deltaToNext = chunkHeader.getConstantPoolPosition();
-        while (deltaToNext != 0) {
-            nextCP += deltaToNext;
-            input.position(nextCP);
-            final long position = nextCP;
+    private boolean awaitUpdatedHeader(long absoluteChunkEnd) throws IOException {
+        Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
+        while (true) {
+            chunkHeader.refresh();
+            if (absoluteChunkEnd != chunkHeader.getEnd()) {
+                return false;
+            }
+            if (chunkHeader.isFinished()) {
+                return true;
+            }
+            Utils.takeNap(pollInterval);
+        }
+    }
+
+    private void fillConstantPools(long abortCP) throws IOException {
+        long thisCP = chunkHeader.getConstantPoolPosition() + chunkHeader.getAbsoluteChunkStart();
+        long lastCP = -1;
+        long delta = -1;
+        boolean log = Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE);
+        while (thisCP != abortCP && delta != 0) {
+            input.position(thisCP);
+            lastCP = thisCP;
             int size = input.readInt(); // size
             long typeId = input.readLong();
             if (typeId != CONSTANT_POOL_TYPE_ID) {
-                throw new IOException("Expected check point event (id = 1) at position " + nextCP + ", but found type id = " + typeId);
+                throw new IOException("Expected check point event (id = 1) at position " + lastCP + ", but found type id = " + typeId);
             }
             input.readLong(); // timestamp
             input.readLong(); // duration
-            deltaToNext = input.readLong();
-            final long delta = deltaToNext;
+            delta = input.readLong();
+            thisCP += delta;
             boolean flush = input.readBoolean();
             int poolCount = input.readInt();
+            final long logLastCP = lastCP;
+            final long logDelta = delta;
             Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> {
-                return "New constant pool: startPosition=" + position + ", size=" + size + ", deltaToNext=" + delta + ", flush=" + flush + ", poolCount=" + poolCount;
+                return "New constant pool: startPosition=" + logLastCP + ", size=" + size + ", deltaToNext=" + logDelta + ", flush=" + flush + ", poolCount=" + poolCount;
             });
-
             for (int i = 0; i < poolCount; i++) {
                 long id = input.readLong(); // type id
-                ConstantMap pool = constantPools.get(id);
+                ConstantLookup lookup = constantLookups.get(id);
                 Type type = typeMap.get(id);
-                if (pool == null) {
+                if (lookup == null) {
                     Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found constant pool(" + id + ") that is never used");
                     if (type == null) {
-                        throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]");
+                        throw new IOException(
+                                "Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]");
                     }
-                    pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName());
-                    constantPools.put(type.getId(), pool);
+                    ConstantMap pool = new ConstantMap(ObjectFactory.create(type, timeConverter), type.getName());
+                    constantLookups.put(type.getId(), new ConstantLookup(pool, type));
                 }
-                Parser parser = typeParser.get(id);
+                Parser parser = parsers.get(id);
                 if (parser == null) {
                     throw new IOException("Could not find constant pool type with id = " + id);
                 }
                 try {
                     int count = input.readInt();
-                    Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, () -> "Constant: " + getName(id) + "[" + count + "]");
+                    if (count == 0) {
+                        throw new InternalError("Pool " + type.getName() + " must contain at least one element ");
+                    }
+                    if (log) {
+                        Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant Pool " + i + ": " + type.getName());
+                    }
                     for (int j = 0; j < count; j++) {
                         long key = input.readLong();
-                        Object value = parser.parse(input);
-                        pool.put(key, value);
+//                      Object resolved = lookup.getCurrent(key);
+// Disable cache        Object resolved = lookup.getResolved(key);
+//                      if (resolved == null) {
+                            Object v = parser.parse(input);
+                            logConstant(key, v, false);
+                            lookup.getLatestPool().put(key, v);
+//                        } else {
+//                            parser.skip(input);
+//                            logConstant(key, resolved, true);
+// Disable cache            lookup.getLatestPool().putResolved(key, resolved);
+//                        }
                     }
                 } catch (Exception e) {
-                    throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + nextCP + ", " + nextCP + size + "]", e);
+                    throw new IOException("Error parsing constant pool type " + getName(id) + " at position " + input.position() + " at check point between [" + lastCP + ", " + lastCP + size + "]",
+                            e);
                 }
             }
-            if (input.position() != nextCP + size) {
+            if (input.position() != lastCP + size) {
                 throw new IOException("Size of check point event doesn't match content");
             }
         }
     }
 
+    private void logConstant(long key, Object v, boolean preresolved) {
+        if (!Logger.shouldLog(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE)) {
+            return;
+        }
+        String valueText;
+        if (v.getClass().isArray()) {
+            Object[] array = (Object[]) v;
+            StringJoiner sj = new StringJoiner(", ", "{", "}");
+            for (int i = 0; i < array.length; i++) {
+                sj.add(textify(array[i]));
+            }
+            valueText = sj.toString();
+        } else {
+            valueText = textify(v);
+        }
+        String suffix  = preresolved ? " (presolved)" :"";
+        Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.TRACE, "Constant: " + key + " = " + valueText + suffix);
+    }
+
+    private String textify(Object o) {
+        if (o == null) { // should not happen
+            return "null";
+        }
+        if (o instanceof String) {
+            return "\"" + String.valueOf(o) + "\"";
+        }
+        if (o instanceof RecordedObject) {
+            return o.getClass().getName();
+        }
+        if (o.getClass().isArray()) {
+            Object[] array = (Object[]) o;
+            if (array.length > 0) {
+                return textify(array[0]) + "[]"; // can it be recursive?
+            }
+        }
+        return String.valueOf(o);
+    }
+
     private String getName(long id) {
         Type type = typeMap.get(id);
         return type == null ? ("unknown(" + id + ")") : type.getName();
@@ -164,11 +332,15 @@
         return metadata.getEventTypes();
     }
 
-    public boolean isLastChunk() {
+    public boolean isLastChunk() throws IOException {
         return chunkHeader.isLastChunk();
     }
 
     public ChunkParser nextChunkParser() throws IOException {
-        return new ChunkParser(chunkHeader.nextHeader());
+        return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval);
+    }
+
+    public boolean isChunkFinished() {
+        return chunkFinished;
     }
 }