src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
branchJEP-349-branch
changeset 57985 be121cbf3284
parent 57971 aa7b1ea52413
child 58020 f082177c5023
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -28,12 +28,8 @@
 import java.io.IOException;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.Arrays;
 import java.util.Objects;
-import java.util.function.Consumer;
 
 import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
@@ -42,196 +38,110 @@
  * Implementation of an event stream that operates against a recording file.
  *
  */
-final class EventFileStream implements EventStream {
-
-    private final static class FileStream extends AbstractEventStream {
-        private static final int DEFAULT_ARRAY_SIZE = 100_000;
-
-        private final RecordingInput input;
-
-        private ChunkParser chunkParser;
-        private RecordedEvent[] sortedList;
-
-        public FileStream(AccessControlContext acc, Path path) throws IOException {
-            super(acc, false);
-            this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
-;        }
-
-        @Override
-        public void process() throws IOException {
-            final StreamConfiguration c1 = configuration;
-            long start = 0;
-            long end = Long.MAX_VALUE;
-            if (c1.getStartTime() != null) {
-                start = c1.getStartNanos();
-            }
-            if (c1.getEndTime() != null) {
-                end = c1.getEndNanos();
-            }
+final class EventFileStream extends AbstractEventStream {
+    private final RecordingInput input;
+    private ChunkParser chunkParser;
+    private RecordedEvent[] sortedList;
 
-            chunkParser = new ChunkParser(input, c1.getReuse());
-            while (!isClosed()) {
-                if (chunkParser.getStartNanos() > end) {
-                    close();
-                    return;
-                }
-                StreamConfiguration c2 = configuration;
-                boolean ordered = c2.getOrdered();
-                chunkParser.setFlushOperation(flushOperation);
-                chunkParser.setFirstNanos(start);
-                chunkParser.setLastNanos(end);
-                chunkParser.setReuse(c2.getReuse());
-                chunkParser.setOrdered(ordered);
-                chunkParser.resetEventCache();
-                chunkParser.setParserFilter(c2.getFiler());
-                chunkParser.updateEventParsers();
-                clearLastDispatch();
-                if (ordered) {
-                    processOrdered();
-                } else {
-                    processUnordered();
-                }
-                if (chunkParser.isLastChunk()) {
-                    return;
-                }
-                chunkParser = chunkParser.nextChunkParser();
-            }
-        }
-
-        private void processOrdered() throws IOException {
-            if (sortedList == null) {
-                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
-            }
-            RecordedEvent event;
-            int index = 0;
-            while (true) {
-                event = chunkParser.readEvent();
-                if (event == null) {
-                    Arrays.sort(sortedList, 0, index, END_TIME);
-                    for (int i = 0; i < index; i++) {
-                        dispatch(sortedList[i]);
-                    }
-                    return;
-                }
-                if (index == sortedList.length) {
-                    RecordedEvent[] tmp = sortedList;
-                    sortedList = new RecordedEvent[2 * tmp.length];
-                    System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
-                }
-                sortedList[index++] = event;
-            }
-        }
-
-        private void processUnordered() throws IOException {
-            RecordedEvent event;
-            while (!isClosed()) {
-                event = chunkParser.readEvent();
-                if (event == null) {
-                    return;
-                }
-                dispatch(event);
-            }
-        }
-
-        @Override
-        public void close() {
-            setClosed(true);;
-            runCloseActions();
-            try {
-                input.close();
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-    }
-
-    private final FileStream eventStream;
-
-    public EventFileStream(Path path, Instant from, Instant to) throws IOException {
+    public EventFileStream(AccessControlContext acc, Path path) throws IOException {
+        super(acc, false);
         Objects.requireNonNull(path);
-        eventStream = new FileStream(AccessController.getContext(), path);
+        this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
     }
 
     @Override
-    public void onEvent(Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(action);
-        eventStream.onEvent(action);
+    public void start() {
+        start(0);
     }
 
     @Override
-    public void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(eventName);
-        Objects.requireNonNull(action);
-        eventStream.onEvent(eventName, action);
-    }
-
-    @Override
-    public void onFlush(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.onFlush(action);
-    }
-
-    @Override
-    public void onClose(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.addCloseAction(action);
+    public void startAsync() {
+        startAsync(0);
     }
 
     @Override
     public void close() {
-        eventStream.close();
-    }
-
-    @Override
-    public boolean remove(Object action) {
-        Objects.requireNonNull(action);
-        return eventStream.remove(action);
-    }
-
-    @Override
-    public void start() {
-        eventStream.start(0);
-    }
-
-    @Override
-    public void setReuse(boolean reuse) {
-        eventStream.setReuse(reuse);
-    }
-
-    @Override
-    public void startAsync() {
-        eventStream.startAsync(0);
+        setClosed(true);
+        runCloseActions();
+        try {
+            input.close();
+        } catch (IOException e) {
+            // ignore
+        }
     }
 
     @Override
-    public void awaitTermination(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        eventStream.awaitTermination(timeout);
-    }
+    public void process() throws IOException {
+        StreamConfiguration c = configuration;
+        long start = 0;
+        long end = Long.MAX_VALUE;
+        if (c.getStartTime() != null) {
+            start = c.getStartNanos();
+        }
+        if (c.getEndTime() != null) {
+            end = c.getEndNanos();
+        }
 
-    @Override
-    public void awaitTermination() {
-        eventStream.awaitTermination();
-    }
-
-    @Override
-    public void setOrdered(boolean ordered) {
-        eventStream.setOrdered(ordered);
+        chunkParser = new ChunkParser(input, c.getReuse());
+        while (!isClosed()) {
+            if (chunkParser.getStartNanos() > end) {
+                close();
+                return;
+            }
+            c = configuration;
+            boolean ordered = c.getOrdered();
+            chunkParser.setFlushOperation(getFlushOperation());
+            chunkParser.setFilterStart(start);
+            chunkParser.setFilterEnd(end);
+            chunkParser.setReuse(c.getReuse());
+            chunkParser.setOrdered(ordered);
+            chunkParser.resetEventCache();
+            chunkParser.setParserFilter(c.getFiler());
+            chunkParser.updateEventParsers();
+            clearLastDispatch();
+            if (ordered) {
+                processOrdered();
+            } else {
+                processUnordered();
+            }
+            if (chunkParser.isLastChunk()) {
+                return;
+            }
+            chunkParser = chunkParser.nextChunkParser();
+        }
     }
 
-    @Override
-    public void setStartTime(Instant startTime) {
-        eventStream.setStartTime(startTime);
+    private void processOrdered() throws IOException {
+        if (sortedList == null) {
+            sortedList = new RecordedEvent[10_000];
+        }
+        RecordedEvent event;
+        int index = 0;
+        while (true) {
+            event = chunkParser.readEvent();
+            if (event == null) {
+                Arrays.sort(sortedList, 0, index, END_TIME);
+                for (int i = 0; i < index; i++) {
+                    dispatch(sortedList[i]);
+                }
+                return;
+            }
+            if (index == sortedList.length) {
+                RecordedEvent[] tmp = sortedList;
+                sortedList = new RecordedEvent[2 * tmp.length];
+                System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
+            }
+            sortedList[index++] = event;
+        }
     }
 
-    @Override
-    public void setEndTime(Instant endTime) {
-        eventStream.setEndTime(endTime);
-    }
-
-    @Override
-    public void onError(Consumer<Throwable> action) {
-        // TODO Auto-generated method stub
-
+    private void processUnordered() throws IOException {
+        while (!isClosed()) {
+            RecordedEvent event = chunkParser.readEvent();
+            if (event == null) {
+                return;
+            }
+            dispatch(event);
+        }
     }
 }