src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
changeset 58863 c16ac7a2eba4
child 59226 a0f39cc47387
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Wed Oct 30 19:43:52 2019 +0100
@@ -0,0 +1,364 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.consumer;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import jdk.jfr.internal.SecuritySupport;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.EventDirectoryStream;
+import jdk.jfr.internal.consumer.EventFileStream;
+import jdk.jfr.internal.consumer.FileAccess;
+
+/**
+ * Represents a stream of events.
+ * <p>
+ * A stream is a sequence of events and the way to interact with a stream is to
+ * register actions. The {@code EventStream} interface is not to be implemented
+ * and future versions of the JDK may prevent this completely.
+ * <p>
+ * To receive a notification when an event arrives, register an action using the
+ * {@link #onEvent(Consumer)} method. To filter the stream for an event with a
+ * specific name, use {@link #onEvent(String, Consumer)} method.
+ * <p>
+ * By default, the same {@code RecordedEvent} object can be used to
+ * represent two or more distinct events. That object can be delivered
+ * multiple times to the same action as well as to other actions. To use an
+ * event object after the action is completed, the
+ * {@link #setReuse(boolean)} method should be set to {@code false} so a
+ * new object is allocated for each event.
+ * <p>
+ * Events are delivered in batches. To receive a notification when a batch is
+ * complete, register an action using the {@link #onFlush(Runnable)} method.
+ * This is an opportunity to aggregate or push data to external systems while
+ * the Java Virtual Machine (JVM) is preparing the next batch.
+ * <p>
+ * Events within a batch are sorted chronologically by their end time.
+ * Well-ordering of events is only maintained for events available to the JVM at
+ * the point of flush, i.e. for the set of events delivered as a unit in a
+ * single batch. Events delivered in a batch could therefore be out-of-order
+ * compared to events delivered in a previous batch, but never out-of-order with
+ * events within the same batch. If ordering is not a concern, sorting can be
+ * disabled using the {@link #setOrdered(boolean)} method.
+ * <p>
+ * To dispatch events to registered actions, the stream must be started. To
+ * start processing in the current thread, invoke the {@link #start()} method.
+ * To process actions asynchronously in a separate thread, invoke the
+ * {@link #startAsync()} method. To await completion of the stream, use the
+ * awaitTermination {@link #awaitTermination()} or the
+ * {@link #awaitTermination(Duration)} method.
+ * <p>
+ * When a stream ends it is automatically closed. To manually stop processing of
+ * events, close the stream by invoking the {@link #close()} method. A stream
+ * can also be automatically closed in exceptional circumstances, for example if
+ * the JVM that is being monitored exits. To receive a notification in any of
+ * these occasions, use the {@link #onClose(Runnable)} method to register an
+ * action.
+ * <p>
+ * If an unexpected exception occurs in an action, it is possible to catch the
+ * exception in an error handler. An error handler can be registered using the
+ * {@link #onError(Runnable)} method. If no error handler is registered, the
+ * default behavior is to print the exception and its backtrace to the standard
+ * error stream.
+ * <p>
+ * The following example shows how an {@code EventStream} can be used to listen
+ * to events on a JVM running Flight Recorder
+ *
+ * <pre>
+ * <code>
+ * try (var es = EventStream.openRepository()) {
+ *   es.onEvent("jdk.CPULoad", event -> {
+ *     System.out.println("CPU Load " + event.getEndTime());
+ *     System.out.println(" Machine total: " + 100 * event.getFloat("machineTotal") + "%");
+ *     System.out.println(" JVM User: " + 100 * event.getFloat("jvmUser") + "%");
+ *     System.out.println(" JVM System: " + 100 * event.getFloat("jvmSystem") + "%");
+ *     System.out.println();
+ *   });
+ *   es.onEvent("jdk.GarbageCollection", event -> {
+ *     System.out.println("Garbage collection: " + event.getLong("gcId"));
+ *     System.out.println(" Cause: " + event.getString("cause"));
+ *     System.out.println(" Total pause: " + event.getDuration("sumOfPauses"));
+ *     System.out.println(" Longest pause: " + event.getDuration("longestPause"));
+ *     System.out.println();
+ *   });
+ *   es.start();
+ * }
+ * </code>
+ * </pre>
+ * <p>
+ * To start recording together with the stream, see {@link RecordingStream}.
+ *
+ * @since 14
+ */
+public interface EventStream extends AutoCloseable {
+    /**
+     * Creates a stream from the repository of the current Java Virtual Machine
+     * (JVM).
+     * <p>
+     * By default, the stream starts with the next event flushed by Flight
+     * Recorder.
+     *
+     * @return an event stream, not {@code null}
+     *
+     * @throws IOException if a stream can't be opened, or an I/O error occurs
+     *         when trying to access the repository
+     *
+     * @throws SecurityException if a security manager exists and the caller
+     *         does not have
+     *         {@code FlightRecorderPermission("accessFlightRecorder")}
+     */
+    public static EventStream openRepository() throws IOException {
+        Utils.checkAccessFlightRecorder();
+        return new EventDirectoryStream(AccessController.getContext(), null, SecuritySupport.PRIVILIGED, false);
+    }
+
+    /**
+     * Creates an event stream from a disk repository.
+     * <p>
+     * By default, the stream starts with the next event flushed by Flight
+     * Recorder.
+     *
+     * @param directory location of the disk repository, not {@code null}
+     *
+     * @return an event stream, not {@code null}
+     *
+     * @throws IOException if a stream can't be opened, or an I/O error occurs
+     *         when trying to access the repository
+     *
+     * @throws SecurityException if a security manager exists and its
+     *         {@code checkRead} method denies read access to the directory, or
+     *         files in the directory.
+     */
+    public static EventStream openRepository(Path directory) throws IOException {
+        Objects.nonNull(directory);
+        AccessControlContext acc = AccessController.getContext();
+        return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILIGED, false);
+    }
+
+    /**
+     * Creates an event stream from a file.
+     * <p>
+     * By default, the stream starts with the first event in the file.
+     *
+     * @param file location of the file, not {@code null}
+     *
+     * @return an event stream, not {@code null}
+     *
+     * @throws IOException if the file can't be opened, or an I/O error occurs
+     *         during reading
+     *
+     * @throws SecurityException if a security manager exists and its
+     *         {@code checkRead} method denies read access to the file
+     */
+    static EventStream openFile(Path file) throws IOException {
+        return new EventFileStream(AccessController.getContext(), file);
+    }
+
+    /**
+     * Registers an action to perform on all events in the stream.
+     *
+     * @param action an action to perform on each {@code RecordedEvent}, not
+     *        {@code null}
+     */
+    void onEvent(Consumer<RecordedEvent> action);
+
+    /**
+     * Registers an action to perform on all events matching a name.
+     *
+     * @param eventName the name of the event, not {@code null}
+     *
+     * @param action an action to perform on each {@code RecordedEvent} matching
+     *        the event name, not {@code null}
+     */
+    void onEvent(String eventName, Consumer<RecordedEvent> action);
+
+    /**
+     * Registers an action to perform after the stream has been flushed.
+     *
+     * @param action an action to perform after the stream has been
+     *        flushed, not {@code null}
+     */
+    void onFlush(Runnable action);
+
+    /**
+     * Registers an action to perform if an exception occurs.
+     * <p>
+     * if an action is not registered, an exception stack trace is printed to
+     * standard error.
+     * <p>
+     * Registering an action overrides the default behavior. If multiple actions
+     * have been registered, they are performed in the order of registration.
+     * <p>
+     * If this method itself throws an exception, resulting behavior is
+     * undefined.
+     *
+     * @param action an action to perform if an exception occurs, not
+     *        {@code null}
+     */
+    void onError(Consumer<Throwable> action);
+
+    /**
+     * Registers an action to perform when the stream is closed.
+     * <p>
+     * If the stream is already closed, the action will be performed immediately
+     * in the current thread.
+     *
+     * @param action an action to perform after the stream is closed, not
+     *        {@code null}
+     * @see #close()
+     */
+    void onClose(Runnable action);
+
+    /**
+     * Releases all resources associated with this stream.
+     * <p>
+     * Closing a previously closed stream has no effect.
+     */
+    void close();
+
+    /**
+     * Unregisters an action.
+     * <p>
+     * If the action has been registered multiple times, all instances are
+     * unregistered.
+     *
+     * @param action the action to unregister, not {@code null}
+     *
+     * @return {@code true} if the action was unregistered, {@code false}
+     *         otherwise
+     *
+     * @see #onEvent(Consumer)
+     * @see #onEvent(String, Consumer)
+     * @see #onFlush(Runnable)
+     * @see #onClose(Runnable)
+     * @see #onError(Consumer)
+     */
+    boolean remove(Object action);
+
+    /**
+     * Specifies that the event object in an {@link #onEvent(Consumer)} action
+     * can be reused.
+     * <p>
+     * If reuse is set to {@code true), an action should not keep a reference
+     * to the event object after the action has completed.
+     *
+     * @param reuse {@code true} if an event object can be reused, {@code false}
+     * otherwise
+     */
+    void setReuse(boolean reuse);
+
+    /**
+     * Specifies that events arrives in chronological order, sorted by the time
+     * they were committed to the stream.
+     *
+     * @param ordered if event objects arrive in chronological order to
+     *        {@code #onEvent(Consumer)}
+     */
+    void setOrdered(boolean ordered);
+
+    /**
+     * Specifies the start time of the stream.
+     * <p>
+     * The start time must be set before starting the stream
+     *
+     * @param startTime the start time, not {@code null}
+     *
+     * @throws IllegalStateException if the stream is already started
+     *
+     * @see #start()
+     * @see #startAsync()
+     */
+    void setStartTime(Instant startTime);
+
+    /**
+     * Specifies the end time of the stream.
+     * <p>
+     * The end time must be set before starting the stream.
+     * <p>
+     * At end time, the stream is closed.
+     *
+     * @param endTime the end time, not {@code null}
+     *
+     * @throws IllegalStateException if the stream is already started
+     *
+     * @see #start()
+     * @see #startAsync()
+     */
+    void setEndTime(Instant endTime);
+
+    /**
+     * Start processing of actions.
+     * <p>
+     * Actions are performed in the current thread.
+     *
+     * @throws IllegalStateException if the stream is already started or closed
+     */
+    void start();
+
+    /**
+     * Start asynchronous processing of actions.
+     * <p>
+     * Actions are performed in a single separate thread.
+     *
+     * @throws IllegalStateException if the stream is already started or closed
+     */
+    void startAsync();
+
+    /**
+     * Blocks until all actions are completed, or the stream is closed, or the
+     * timeout occurs, or the current thread is interrupted, whichever happens
+     * first.
+     *
+     * @param timeout the maximum time to wait, not {@code null}
+     *
+     * @throws IllegalArgumentException if timeout is negative
+     * @throws InterruptedException if interrupted while waiting
+     *
+     * @see #start()
+     * @see #startAsync()
+     * @see Thread#interrupt()
+     */
+    void awaitTermination(Duration timeout) throws InterruptedException;
+
+    /**
+     * Blocks until all actions are completed, or the stream is closed, or the
+     * current thread is interrupted, whichever happens first.
+     *
+     * @throws InterruptedException if interrupted while waiting
+     *
+     * @see #start()
+     * @see #startAsync()
+     * @see Thread#interrupt()
+     */
+    void awaitTermination() throws InterruptedException;
+}