src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java
changeset 58863 c16ac7a2eba4
child 59226 a0f39cc47387
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java	Wed Oct 30 19:43:52 2019 +0100
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.jfr.internal.consumer;
+
+import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import jdk.jfr.consumer.EventStream;
+import jdk.jfr.consumer.RecordedEvent;
+import jdk.jfr.internal.LogLevel;
+import jdk.jfr.internal.LogTag;
+import jdk.jfr.internal.Logger;
+import jdk.jfr.internal.SecuritySupport;
+
+/*
+ * Purpose of this class is to simplify the implementation of
+ * an event stream.
+ */
+abstract class AbstractEventStream implements EventStream {
+    private final static AtomicLong counter = new AtomicLong(1);
+
+    private final Object terminated = new Object();
+    private final boolean active;
+    private final Runnable flushOperation = () -> dispatcher().runFlushActions();
+    private final AccessControlContext accessControllerContext;
+    private final StreamConfiguration configuration = new StreamConfiguration();
+
+    private volatile Thread thread;
+    private Dispatcher dispatcher;
+
+    private volatile boolean closed;
+
+    AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
+        this.accessControllerContext = Objects.requireNonNull(acc);
+        this.active = active;
+    }
+
+    @Override
+    abstract public void start();
+
+    @Override
+    abstract public void startAsync();
+
+    @Override
+    abstract public void close();
+
+    protected final Dispatcher dispatcher() {
+        if (configuration.hasChanged()) {
+            synchronized (configuration) {
+                dispatcher = new Dispatcher(configuration);
+            }
+        }
+        return dispatcher;
+    }
+
+    @Override
+    public final void setOrdered(boolean ordered) {
+        configuration.setOrdered(ordered);
+    }
+
+    @Override
+    public final void setReuse(boolean reuse) {
+        configuration.setReuse(reuse);
+    }
+
+    @Override
+    public final void setStartTime(Instant startTime) {
+        Objects.nonNull(startTime);
+        synchronized (configuration) {
+            if (configuration.started) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            if (startTime.isBefore(Instant.EPOCH)) {
+                startTime = Instant.EPOCH;
+            }
+            configuration.setStartTime(startTime);
+        }
+    }
+
+    @Override
+    public final void setEndTime(Instant endTime) {
+        Objects.requireNonNull(endTime);
+        synchronized (configuration) {
+            if (configuration.started) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            configuration.setEndTime(endTime);
+        }
+    }
+
+    @Override
+    public final void onEvent(Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(action);
+        configuration.addEventAction(action);
+    }
+
+    @Override
+    public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(eventName);
+        Objects.requireNonNull(action);
+        configuration.addEventAction(eventName, action);
+    }
+
+    @Override
+    public final void onFlush(Runnable action) {
+        Objects.requireNonNull(action);
+        configuration.addFlushAction(action);
+    }
+
+    @Override
+    public final void onClose(Runnable action) {
+        Objects.requireNonNull(action);
+        configuration.addCloseAction(action);
+    }
+
+    @Override
+    public final void onError(Consumer<Throwable> action) {
+        Objects.requireNonNull(action);
+        configuration.addErrorAction(action);
+    }
+
+    @Override
+    public final boolean remove(Object action) {
+        Objects.requireNonNull(action);
+        return configuration.remove(action);
+    }
+
+    @Override
+    public final void awaitTermination() throws InterruptedException {
+        awaitTermination(Duration.ofMillis(0));
+    }
+
+    @Override
+    public final void awaitTermination(Duration timeout) throws InterruptedException {
+        Objects.requireNonNull(timeout);
+        if (timeout.isNegative()) {
+            throw new IllegalArgumentException("timeout value is negative");
+        }
+
+        long base = System.currentTimeMillis();
+        long now = 0;
+
+        long millis;
+        try {
+            millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+        } catch (ArithmeticException a) {
+            millis = Long.MAX_VALUE;
+        }
+        int nanos = timeout.toNanosPart();
+        if (nanos == 0 && millis == 0) {
+            synchronized (terminated) {
+                while (!isClosed()) {
+                    terminated.wait(0);
+                }
+            }
+        } else {
+            while (!isClosed()) {
+                long delay = millis - now;
+                if (delay <= 0) {
+                    break;
+                }
+                synchronized (terminated) {
+                    terminated.wait(delay, nanos);
+                }
+                now = System.currentTimeMillis() - base;
+            }
+        }
+    }
+
+    protected abstract void process() throws IOException;
+
+    protected final void setClosed(boolean closed) {
+        this.closed = closed;
+    }
+
+    protected final boolean isClosed() {
+        return closed;
+    }
+
+    public final void startAsync(long startNanos) {
+        startInternal(startNanos);
+        Runnable r = () -> run(accessControllerContext);
+        thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
+        thread.start();
+    }
+
+    public final void start(long startNanos) {
+        startInternal(startNanos);
+        thread = Thread.currentThread();
+        run(accessControllerContext);
+    }
+
+    protected final Runnable getFlushOperation() {
+        return flushOperation;
+    }
+
+    private void startInternal(long startNanos) {
+        synchronized (configuration) {
+            if (configuration.started) {
+                throw new IllegalStateException("Event stream can only be started once");
+            }
+            if (active && configuration.startTime == null) {
+                configuration.setStartNanos(startNanos);
+            }
+            configuration.setStarted(true);
+        }
+    }
+
+    private void execute() {
+        try {
+            process();
+        } catch (IOException ioe) {
+            // This can happen if a chunk file is removed, or
+            // a file is access that has been closed
+            // This is "normal" behavior for streaming and the
+            // stream will be closed when this happens.
+        } finally {
+            Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+            try {
+                close();
+            } finally {
+                synchronized (terminated) {
+                    terminated.notifyAll();
+                }
+            }
+        }
+    }
+
+    private void run(AccessControlContext accessControlContext) {
+        AccessController.doPrivileged(new PrivilegedAction<Void>() {
+            @Override
+            public Void run() {
+                execute();
+                return null;
+            }
+        }, accessControlContext);
+    }
+
+    private String nextThreadName() {
+        counter.incrementAndGet();
+        return "JFR Event Stream " + counter;
+    }
+}