diff -r 2c3cc4b01880 -r c16ac7a2eba4 src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java Wed Oct 30 19:43:52 2019 +0100 @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.internal.consumer; + +import java.io.IOException; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import jdk.jfr.consumer.EventStream; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; +import jdk.jfr.internal.SecuritySupport; + +/* + * Purpose of this class is to simplify the implementation of + * an event stream. + */ +abstract class AbstractEventStream implements EventStream { + private final static AtomicLong counter = new AtomicLong(1); + + private final Object terminated = new Object(); + private final boolean active; + private final Runnable flushOperation = () -> dispatcher().runFlushActions(); + private final AccessControlContext accessControllerContext; + private final StreamConfiguration configuration = new StreamConfiguration(); + + private volatile Thread thread; + private Dispatcher dispatcher; + + private volatile boolean closed; + + AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { + this.accessControllerContext = Objects.requireNonNull(acc); + this.active = active; + } + + @Override + abstract public void start(); + + @Override + abstract public void startAsync(); + + @Override + abstract public void close(); + + protected final Dispatcher dispatcher() { + if (configuration.hasChanged()) { + synchronized (configuration) { + dispatcher = new Dispatcher(configuration); + } + } + return dispatcher; + } + + @Override + public final void setOrdered(boolean ordered) { + configuration.setOrdered(ordered); + } + + @Override + public final void setReuse(boolean reuse) { + configuration.setReuse(reuse); + } + + @Override + public final void setStartTime(Instant startTime) { + Objects.nonNull(startTime); + synchronized (configuration) { + if (configuration.started) { + throw new IllegalStateException("Stream is already started"); + } + if (startTime.isBefore(Instant.EPOCH)) { + startTime = Instant.EPOCH; + } + configuration.setStartTime(startTime); + } + } + + @Override + public final void setEndTime(Instant endTime) { + Objects.requireNonNull(endTime); + synchronized (configuration) { + if (configuration.started) { + throw new IllegalStateException("Stream is already started"); + } + configuration.setEndTime(endTime); + } + } + + @Override + public final void onEvent(Consumer action) { + Objects.requireNonNull(action); + configuration.addEventAction(action); + } + + @Override + public final void onEvent(String eventName, Consumer action) { + Objects.requireNonNull(eventName); + Objects.requireNonNull(action); + configuration.addEventAction(eventName, action); + } + + @Override + public final void onFlush(Runnable action) { + Objects.requireNonNull(action); + configuration.addFlushAction(action); + } + + @Override + public final void onClose(Runnable action) { + Objects.requireNonNull(action); + configuration.addCloseAction(action); + } + + @Override + public final void onError(Consumer action) { + Objects.requireNonNull(action); + configuration.addErrorAction(action); + } + + @Override + public final boolean remove(Object action) { + Objects.requireNonNull(action); + return configuration.remove(action); + } + + @Override + public final void awaitTermination() throws InterruptedException { + awaitTermination(Duration.ofMillis(0)); + } + + @Override + public final void awaitTermination(Duration timeout) throws InterruptedException { + Objects.requireNonNull(timeout); + if (timeout.isNegative()) { + throw new IllegalArgumentException("timeout value is negative"); + } + + long base = System.currentTimeMillis(); + long now = 0; + + long millis; + try { + millis = Math.multiplyExact(timeout.getSeconds(), 1000); + } catch (ArithmeticException a) { + millis = Long.MAX_VALUE; + } + int nanos = timeout.toNanosPart(); + if (nanos == 0 && millis == 0) { + synchronized (terminated) { + while (!isClosed()) { + terminated.wait(0); + } + } + } else { + while (!isClosed()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + synchronized (terminated) { + terminated.wait(delay, nanos); + } + now = System.currentTimeMillis() - base; + } + } + } + + protected abstract void process() throws IOException; + + protected final void setClosed(boolean closed) { + this.closed = closed; + } + + protected final boolean isClosed() { + return closed; + } + + public final void startAsync(long startNanos) { + startInternal(startNanos); + Runnable r = () -> run(accessControllerContext); + thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r); + thread.start(); + } + + public final void start(long startNanos) { + startInternal(startNanos); + thread = Thread.currentThread(); + run(accessControllerContext); + } + + protected final Runnable getFlushOperation() { + return flushOperation; + } + + private void startInternal(long startNanos) { + synchronized (configuration) { + if (configuration.started) { + throw new IllegalStateException("Event stream can only be started once"); + } + if (active && configuration.startTime == null) { + configuration.setStartNanos(startNanos); + } + configuration.setStarted(true); + } + } + + private void execute() { + try { + process(); + } catch (IOException ioe) { + // This can happen if a chunk file is removed, or + // a file is access that has been closed + // This is "normal" behavior for streaming and the + // stream will be closed when this happens. + } finally { + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); + try { + close(); + } finally { + synchronized (terminated) { + terminated.notifyAll(); + } + } + } + } + + private void run(AccessControlContext accessControlContext) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + execute(); + return null; + } + }, accessControlContext); + } + + private String nextThreadName() { + counter.incrementAndGet(); + return "JFR Event Stream " + counter; + } +}