--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Wed Oct 30 19:43:52 2019 +0100
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.internal.consumer;
+
+import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import jdk.jfr.consumer.EventStream;
+import jdk.jfr.consumer.RecordedEvent;
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.SecuritySupport;
+
+/*
+ * Purpose of this class is to simplify the implementation of
+ * an event stream.
+ */
+abstract class AbstractEventStream implements EventStream {
+ private final static AtomicLong counter = new AtomicLong(1);
+
+ private final Object terminated = new Object();
+ private final boolean active;
+ private final Runnable flushOperation = () -> dispatcher().runFlushActions();
+ private final AccessControlContext accessControllerContext;
+ private final StreamConfiguration configuration = new StreamConfiguration();
+
+ private volatile Thread thread;
+ private Dispatcher dispatcher;
+
+ private volatile boolean closed;
+
+ AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
+ this.accessControllerContext = Objects.requireNonNull(acc);
+ this.active = active;
+ }
+
+ @Override
+ abstract public void start();
+
+ @Override
+ abstract public void startAsync();
+
+ @Override
+ abstract public void close();
+
+ protected final Dispatcher dispatcher() {
+ if (configuration.hasChanged()) {
+ synchronized (configuration) {
+ dispatcher = new Dispatcher(configuration);
+ }
+ }
+ return dispatcher;
+ }
+
+ @Override
+ public final void setOrdered(boolean ordered) {
+ configuration.setOrdered(ordered);
+ }
+
+ @Override
+ public final void setReuse(boolean reuse) {
+ configuration.setReuse(reuse);
+ }
+
+ @Override
+ public final void setStartTime(Instant startTime) {
+ Objects.nonNull(startTime);
+ synchronized (configuration) {
+ if (configuration.started) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ if (startTime.isBefore(Instant.EPOCH)) {
+ startTime = Instant.EPOCH;
+ }
+ configuration.setStartTime(startTime);
+ }
+ }
+
+ @Override
+ public final void setEndTime(Instant endTime) {
+ Objects.requireNonNull(endTime);
+ synchronized (configuration) {
+ if (configuration.started) {
+ throw new IllegalStateException("Stream is already started");
+ }
+ configuration.setEndTime(endTime);
+ }
+ }
+
+ @Override
+ public final void onEvent(Consumer<RecordedEvent> action) {
+ Objects.requireNonNull(action);
+ configuration.addEventAction(action);
+ }
+
+ @Override
+ public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
+ Objects.requireNonNull(eventName);
+ Objects.requireNonNull(action);
+ configuration.addEventAction(eventName, action);
+ }
+
+ @Override
+ public final void onFlush(Runnable action) {
+ Objects.requireNonNull(action);
+ configuration.addFlushAction(action);
+ }
+
+ @Override
+ public final void onClose(Runnable action) {
+ Objects.requireNonNull(action);
+ configuration.addCloseAction(action);
+ }
+
+ @Override
+ public final void onError(Consumer<Throwable> action) {
+ Objects.requireNonNull(action);
+ configuration.addErrorAction(action);
+ }
+
+ @Override
+ public final boolean remove(Object action) {
+ Objects.requireNonNull(action);
+ return configuration.remove(action);
+ }
+
+ @Override
+ public final void awaitTermination() throws InterruptedException {
+ awaitTermination(Duration.ofMillis(0));
+ }
+
+ @Override
+ public final void awaitTermination(Duration timeout) throws InterruptedException {
+ Objects.requireNonNull(timeout);
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("timeout value is negative");
+ }
+
+ long base = System.currentTimeMillis();
+ long now = 0;
+
+ long millis;
+ try {
+ millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+ } catch (ArithmeticException a) {
+ millis = Long.MAX_VALUE;
+ }
+ int nanos = timeout.toNanosPart();
+ if (nanos == 0 && millis == 0) {
+ synchronized (terminated) {
+ while (!isClosed()) {
+ terminated.wait(0);
+ }
+ }
+ } else {
+ while (!isClosed()) {
+ long delay = millis - now;
+ if (delay <= 0) {
+ break;
+ }
+ synchronized (terminated) {
+ terminated.wait(delay, nanos);
+ }
+ now = System.currentTimeMillis() - base;
+ }
+ }
+ }
+
+ protected abstract void process() throws IOException;
+
+ protected final void setClosed(boolean closed) {
+ this.closed = closed;
+ }
+
+ protected final boolean isClosed() {
+ return closed;
+ }
+
+ public final void startAsync(long startNanos) {
+ startInternal(startNanos);
+ Runnable r = () -> run(accessControllerContext);
+ thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
+ thread.start();
+ }
+
+ public final void start(long startNanos) {
+ startInternal(startNanos);
+ thread = Thread.currentThread();
+ run(accessControllerContext);
+ }
+
+ protected final Runnable getFlushOperation() {
+ return flushOperation;
+ }
+
+ private void startInternal(long startNanos) {
+ synchronized (configuration) {
+ if (configuration.started) {
+ throw new IllegalStateException("Event stream can only be started once");
+ }
+ if (active && configuration.startTime == null) {
+ configuration.setStartNanos(startNanos);
+ }
+ configuration.setStarted(true);
+ }
+ }
+
+ private void execute() {
+ try {
+ process();
+ } catch (IOException ioe) {
+ // This can happen if a chunk file is removed, or
+ // a file is access that has been closed
+ // This is "normal" behavior for streaming and the
+ // stream will be closed when this happens.
+ } finally {
+ Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+ try {
+ close();
+ } finally {
+ synchronized (terminated) {
+ terminated.notifyAll();
+ }
+ }
+ }
+ }
+
+ private void run(AccessControlContext accessControlContext) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ execute();
+ return null;
+ }
+ }, accessControlContext);
+ }
+
+ private String nextThreadName() {
+ counter.incrementAndGet();
+ return "JFR Event Stream " + counter;
+ }
+}