Clean up class hiercharchy JEP-349-branch
authoregahlin
Mon, 02 Sep 2019 21:08:41 +0200
branchJEP-349-branch
changeset 57985 be121cbf3284
parent 57984 269bbe414580
child 57987 23e3cd901cb6
Clean up class hiercharchy
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -45,6 +45,7 @@
 import jdk.jfr.internal.LogTag;
 import jdk.jfr.internal.Logger;
 import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.SecuritySupport;
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.InternalEventFilter;
 
@@ -59,18 +60,18 @@
  * - security
  *
  */
-abstract class AbstractEventStream implements Runnable {
+abstract class AbstractEventStream implements EventStream {
 
-    public static final class StreamConfiguration {
+    protected static final class StreamConfiguration {
         private static final Runnable[] NO_ACTIONS = new Runnable[0];
 
+        private Consumer<?>[] errorActions = new Consumer<?>[0];
         private Runnable[] flushActions = NO_ACTIONS;
         private Runnable[] closeActions = NO_ACTIONS;
-        private Runnable[] errorActions = NO_ACTIONS;
-
-        private EventDispatcher[] dispatchers = NO_DISPATCHERS;
+        private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
         private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
         private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
         private boolean changedConfiguration = false;
         private boolean closed = false;
         private boolean reuse = true;
@@ -84,6 +85,7 @@
         public StreamConfiguration(StreamConfiguration configuration) {
             this.flushActions = configuration.flushActions;
             this.closeActions = configuration.closeActions;
+            this.errorActions = configuration.errorActions;
             this.dispatchers = configuration.dispatchers;
             this.eventFilter = configuration.eventFilter;
             this.closed = configuration.closed;
@@ -100,50 +102,50 @@
         public StreamConfiguration() {
         }
 
-        final public StreamConfiguration remove(Object action) {
+        public StreamConfiguration remove(Object action) {
             flushActions = remove(flushActions, action);
             closeActions = remove(closeActions, action);
             dispatchers = removeDispatch(dispatchers, action);
             return this;
         }
 
-        final public StreamConfiguration addDispatcher(EventDispatcher e) {
+        public StreamConfiguration addDispatcher(EventDispatcher e) {
             dispatchers = add(dispatchers, e);
             eventFilter = buildFilter(dispatchers);
             dispatcherLookup = new LongMap<>();
             return this;
         }
 
-        final public StreamConfiguration addFlushAction(Runnable action) {
+        public StreamConfiguration addFlushAction(Runnable action) {
             flushActions = add(flushActions, action);
             return this;
         }
 
-        final public StreamConfiguration addCloseAction(Runnable action) {
+        public StreamConfiguration addCloseAction(Runnable action) {
             closeActions = add(closeActions, action);
             return this;
         }
 
-        public StreamConfiguration addErrorAction(Runnable action) {
+        public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
             errorActions = add(errorActions, action);
             return this;
         }
 
-        final public StreamConfiguration setClosed(boolean closed) {
+        public StreamConfiguration setClosed(boolean closed) {
             this.closed = closed;
             changedConfiguration = true;
             return this;
         }
 
-        final public boolean isClosed() {
+        public boolean isClosed() {
             return closed;
         }
 
-        final public Runnable[] getCloseActions() {
+        public Runnable[] getCloseActions() {
             return closeActions;
         }
 
-        final public Runnable[] getFlushActions() {
+        public Runnable[] getFlushActions() {
             return flushActions;
         }
 
@@ -197,13 +199,13 @@
             return ef;
         }
 
-        final public StreamConfiguration setReuse(boolean reuse) {
+        public StreamConfiguration setReuse(boolean reuse) {
             this.reuse = reuse;
             changedConfiguration = true;
             return this;
         }
 
-        final public StreamConfiguration setOrdered(boolean ordered) {
+        public StreamConfiguration setOrdered(boolean ordered) {
             this.ordered = ordered;
             changedConfiguration = true;
             return this;
@@ -216,14 +218,14 @@
             return this;
         }
 
-        final public StreamConfiguration setStartTime(Instant startTime) {
+        public StreamConfiguration setStartTime(Instant startTime) {
             this.startTime = startTime;
             this.startNanos = Utils.timeToNanos(startTime);
             changedConfiguration = true;
             return this;
         }
 
-        final public Instant getStartTime() {
+        public Instant getStartTime() {
             return startTime;
         }
 
@@ -231,50 +233,50 @@
             return endTime;
         }
 
-        final public boolean isStarted() {
+        public boolean isStarted() {
             return started;
         }
 
-        final public StreamConfiguration setStartNanos(long startNanos) {
+        public StreamConfiguration setStartNanos(long startNanos) {
             this.startNanos = startNanos;
             changedConfiguration = true;
             return this;
         }
 
-        final public void setStarted(boolean started) {
+        public void setStarted(boolean started) {
             this.started = started;
             changedConfiguration = true;
         }
 
-        final public boolean hasChanged() {
+        public boolean hasChanged() {
             return changedConfiguration;
         }
 
-        final public boolean getReuse() {
+        public boolean getReuse() {
             return reuse;
         }
 
-        final public boolean getOrdered() {
+        public boolean getOrdered() {
             return ordered;
         }
 
-        final public InternalEventFilter getFiler() {
+        public InternalEventFilter getFiler() {
             return eventFilter;
         }
 
-        final public long getStartNanos() {
+        public long getStartNanos() {
             return startNanos;
         }
 
-        final public long getEndNanos() {
+        public long getEndNanos() {
             return endNanos;
         }
 
-        final public InternalEventFilter getFilter() {
+        public InternalEventFilter getFilter() {
             return eventFilter;
         }
 
-        final public String toString() {
+        public String toString() {
             StringBuilder sb = new StringBuilder();
             for (Runnable flush : flushActions) {
                 sb.append("Flush Action: ").append(flush).append("\n");
@@ -282,6 +284,9 @@
             for (Runnable close : closeActions) {
                 sb.append("Close Action: " + close + "\n");
             }
+            for (Consumer<?> error : errorActions) {
+                sb.append("Error Action: " + error + "\n");
+            }
             for (EventDispatcher dispatcher : dispatchers) {
                 sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
             }
@@ -301,9 +306,8 @@
         }
     }
 
-    final static class EventDispatcher {
+    private final static class EventDispatcher {
         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-
         final private String eventName;
         final private Consumer<RecordedEvent> action;
 
@@ -325,13 +329,13 @@
         }
     }
 
-    public final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
+    final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
 
-    private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-    private final AccessControlContext accessControlContext;
     private final Thread thread;
     private final boolean active;
-    protected final Runnable flushOperation = () -> runFlushActions();
+    private final Runnable flushOperation = () -> runFlushActions();
+    private final AccessControlContext accessControllerContext;
+    private final Object configurationLock = new Object();
 
     // Modified by updateConfiguration()
     protected volatile StreamConfiguration configuration = new StreamConfiguration();
@@ -341,21 +345,231 @@
     private EventDispatcher[] lastEventDispatch;
 
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
-        this.accessControlContext = acc;
+        this.accessControllerContext = Objects.requireNonNull(acc);
         this.active = active;
-        // Create thread object in constructor to ensure caller has
-        // permission before constructing object
-        thread = new Thread(this);
+        this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
+    }
+
+    @Override
+    abstract public void start();
+
+    @Override
+    abstract public void startAsync();
+
+    @Override
+    abstract public void close();
+
+    // Purpose of synchronizing the following methods is
+    // to serialize changes to the configuration, so only one
+    // thread at a time can change the configuration.
+    //
+    // The purpose is not to guard the configuration field. A new
+    // configuration is published using updateConfiguration
+    //
+    @Override
+    public final void setOrdered(boolean ordered) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
+        }
+    }
+
+    @Override
+    public final void setReuse(boolean reuse) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
+        }
+    }
+
+    @Override
+    public final void setStartTime(Instant startTime) {
+        Objects.nonNull(startTime);
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            if (startTime.isBefore(Instant.EPOCH)) {
+                startTime = Instant.EPOCH;
+            }
+            updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+        }
+    }
+
+    @Override
+    public final void setEndTime(Instant endTime) {
+        Objects.requireNonNull(endTime);
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Stream is already started");
+            }
+            updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+        }
+    }
+
+    @Override
+    public final void onEvent(Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            add(new EventDispatcher(action));
+        }
+    }
+
+    @Override
+    public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
+        Objects.requireNonNull(eventName);
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            add(new EventDispatcher(eventName, action));
+        }
+    }
+
+    @Override
+    public final void onFlush(Runnable action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
+        }
+    }
+
+    @Override
+    public final void onClose(Runnable action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
+        }
+    }
+
+    @Override
+    public final void onError(Consumer<Throwable> action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
+        }
+    }
+
+    @Override
+    public final boolean remove(Object action) {
+        Objects.requireNonNull(action);
+        synchronized (configurationLock) {
+            return updateConfiguration(new StreamConfiguration(configuration).remove(action));
+        }
     }
 
-    public final void run() {
-        AccessController.doPrivileged(new PrivilegedAction<Void>() {
-            @Override
-            public Void run() {
-                execute();
-                return null;
+    @Override
+    public final void awaitTermination() {
+        awaitTermination(Duration.ofMillis(0));
+    }
+
+    @Override
+    public final void awaitTermination(Duration timeout) {
+        Objects.requireNonNull(timeout);
+        if (thread != Thread.currentThread()) {
+            try {
+                thread.join(timeout.toMillis());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    protected abstract void process() throws Exception;
+
+    protected final void clearLastDispatch() {
+        lastEventDispatch = null;
+        lastEventType = null;
+    }
+
+    protected final void dispatch(RecordedEvent event) {
+        EventType type = event.getEventType();
+        EventDispatcher[] dispatchers = null;
+        if (type == lastEventType) {
+            dispatchers = lastEventDispatch;
+        } else {
+            dispatchers = configuration.dispatcherLookup.get(type.getId());
+            if (dispatchers == null) {
+                List<EventDispatcher> list = new ArrayList<>();
+                for (EventDispatcher e : configuration.getDispatchers()) {
+                    if (e.accepts(type)) {
+                        list.add(e);
+                    }
+                }
+                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+                configuration.dispatcherLookup.put(type.getId(), dispatchers);
+            }
+            lastEventDispatch = dispatchers;
+        }
+        for (int i = 0; i < dispatchers.length; i++) {
+            try {
+                dispatchers[i].offer(event);
+            } catch (Exception e) {
+                handleError(e);
             }
-        }, accessControlContext);
+        }
+    }
+
+    protected final void runCloseActions() {
+        Runnable[] closeActions = configuration.getCloseActions();
+        for (int i = 0; i < closeActions.length; i++) {
+            try {
+                closeActions[i].run();
+            } catch (Exception e) {
+                handleError(e);
+            }
+        }
+    }
+
+    protected final void setClosed(boolean closed) {
+        synchronized (configurationLock) {
+            updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
+        }
+    }
+
+    protected final boolean isClosed() {
+        return configuration.isClosed();
+    }
+
+    protected final void startAsync(long startNanos) {
+        startInternal(startNanos);
+        thread.start();
+    }
+
+    protected final void start(long startNanos) {
+        startInternal(startNanos);
+        run(accessControllerContext);
+    }
+
+    protected final Runnable getFlushOperation() {
+        return flushOperation;
+    }
+
+    private void add(EventDispatcher e) {
+        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
+    }
+
+    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
+        if (!Thread.holdsLock(configurationLock)) {
+            throw new InternalError("Modification of configuration without proper lock");
+        }
+        if (newConfiguration.hasChanged()) {
+            // Publish objects held by configuration object
+            VarHandle.releaseFence();
+            configuration = newConfiguration;
+            return true;
+        }
+        return false;
+    }
+
+    private void startInternal(long startNanos) {
+        synchronized (configurationLock) {
+            if (configuration.isStarted()) {
+                throw new IllegalStateException("Event stream can only be started once");
+            }
+            StreamConfiguration c = new StreamConfiguration(configuration);
+            if (active) {
+                c.setStartNanos(startNanos);
+            }
+            c.setStarted(true);
+            updateConfiguration(c);
+        }
     }
 
     private void execute() {
@@ -369,202 +583,41 @@
         }
     }
 
-    public abstract void process() throws Exception;
-
-    protected final void clearLastDispatch() {
-        lastEventDispatch = null;
-        lastEventType = null;
-    }
-
-    protected final void dispatch(RecordedEvent event) {
-        EventType type = event.getEventType();
-        EventDispatcher[] ret = null;
-        if (type == lastEventType) {
-            ret = lastEventDispatch;
-        } else {
-            ret = configuration.dispatcherLookup.get(type.getId());
-            if (ret == null) {
-                List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : configuration.getDispatchers()) {
-                    if (e.accepts(type)) {
-                        list.add(e);
-                    }
-                }
-                ret = list.isEmpty() ? NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                configuration.dispatcherLookup.put(type.getId(), ret);
-            }
-            lastEventDispatch = ret;
+    private void handleError(Throwable e) {
+        Consumer<?>[] consumers = configuration.errorActions;
+        if (consumers.length == 0) {
+            defaultErrorHandler(e);
+            return;
         }
-        for (int i = 0; i < ret.length; i++) {
-            try {
-                ret[i].offer(event);
-            } catch (Exception e) {
-                handleError(e);
-            }
+        for (int i = 0; i < consumers.length; i++) {
+            @SuppressWarnings("unchecked")
+            Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
+            c.accept(e);
         }
     }
 
-    protected final void handleError(Throwable e) {
-        StreamConfiguration c = configuration;
-        if (c.errorActions.length == 0) {
-            defaultErrorHandler(e);
-            return;
-        }
-        for (Runnable r : c.errorActions) {
-            r.run();
-        }
-    }
-
-    protected final void defaultErrorHandler(Throwable e) {
+    private void defaultErrorHandler(Throwable e) {
         e.printStackTrace();
     }
 
-    public final void runCloseActions() {
-        Runnable[] cas = configuration.getCloseActions();
-        for (int i = 0; i < cas.length; i++) {
+    private void runFlushActions() {
+        Runnable[] flushActions = configuration.getFlushActions();
+        for (int i = 0; i < flushActions.length; i++) {
             try {
-                cas[i].run();
+                flushActions[i].run();
             } catch (Exception e) {
                 handleError(e);
             }
         }
     }
 
-    public final void runFlushActions() {
-        Runnable[] fas = configuration.getFlushActions();
-        for (int i = 0; i < fas.length; i++) {
-            try {
-                fas[i].run();
-            } catch (Exception e) {
-                handleError(e);
+    private void run(AccessControlContext acc) {
+        AccessController.doPrivileged(new PrivilegedAction<Void>() {
+            @Override
+            public Void run() {
+                execute();
+                return null;
             }
-        }
-
-    }
-
-    // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration, so only one
-    // thread at a time can change the configuration.
-    //
-    // The purpose is not to guard the configuration field. A new
-    // configuration is published using updateConfiguration
-    //
-    public final synchronized boolean remove(Object action) {
-        return updateConfiguration(new StreamConfiguration(configuration).remove(action));
-    }
-
-    public final synchronized void onEvent(Consumer<RecordedEvent> action) {
-        add(new EventDispatcher(action));
-    }
-
-    public final synchronized void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        add(new EventDispatcher(eventName, action));
-    }
-
-    private final synchronized void add(EventDispatcher e) {
-        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
-    }
-
-    public final synchronized void onFlush(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
-    }
-
-    public final synchronized void addCloseAction(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
-    }
-
-    public final synchronized void addErrorAction(Runnable action) {
-        updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
-    }
-
-    public final synchronized void setClosed(boolean closed) {
-        updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
-    }
-
-    public final synchronized void setReuse(boolean reuse) {
-        updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
-    }
-
-    public final synchronized void setOrdered(boolean ordered) {
-        updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
-    }
-
-    public final synchronized void setStartNanos(long startNanos) {
-        updateConfiguration(new StreamConfiguration(configuration).setStartNanos(startNanos));
+        }, acc);
     }
-
-    public final synchronized void setStartTime(Instant startTime) {
-        Objects.nonNull(startTime);
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Stream is already started");
-        }
-        if (startTime.isBefore(Instant.EPOCH)) {
-            startTime = Instant.EPOCH;
-        }
-        updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
-    }
-
-    public final synchronized void setEndTime(Instant endTime) {
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Stream is already started");
-        }
-        updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
-    }
-
-    protected boolean updateConfiguration(StreamConfiguration newConfiguration) {
-        if (!Thread.holdsLock(this)) {
-            throw new InternalError("Modification of configuration without proper lock");
-        }
-        if (newConfiguration.hasChanged()) {
-            // Publish objects held by configuration object
-            VarHandle.releaseFence();
-            configuration = newConfiguration;
-            return true;
-        }
-        return false;
-    }
-
-    public final boolean isClosed() {
-        return configuration.isClosed();
-    }
-
-    public final void startAsync(long startNanos) {
-        startInternal(startNanos);
-        thread.start();
-    }
-
-    public final void start(long startNanos) {
-        startInternal(startNanos);
-        run();
-    }
-
-    private synchronized void startInternal(long startNanos) {
-        if (configuration.isStarted()) {
-            throw new IllegalStateException("Event stream can only be started once");
-        }
-        StreamConfiguration c = new StreamConfiguration(configuration);
-        if (active) {
-            c.setStartNanos(startNanos);
-        }
-        c.setStarted(true);
-        updateConfiguration(c);
-    }
-
-    public final void awaitTermination(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        if (thread != Thread.currentThread()) {
-            try {
-                thread.join(timeout.toMillis());
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public final void awaitTermination() {
-        awaitTermination(Duration.ofMillis(0));
-    }
-
-    abstract public void close();
-
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Mon Sep 02 21:08:41 2019 +0200
@@ -74,8 +74,8 @@
     private boolean reuse;
     private boolean ordered;
     private boolean resetEventCache;
-    private long firstNanos = 0;
-    private long lastNanos = Long.MAX_VALUE;
+    private long filterStart = 0;
+    private long filterEnd = Long.MAX_VALUE;
     private Runnable flushOperation;
 
     public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
@@ -398,17 +398,17 @@
 
     // Need to call updateEventParsers() for
     // change to take effect
-    public void setFirstNanos(long firstNanos) {
+    public void setFilterStart(long filterStart) {
         long chunkStart = chunkHeader.getStartNanos();
         // Optimization.
-        if (firstNanos < chunkStart - 1_000_000_000L) {
-            firstNanos = 0;
+        if (filterStart < chunkStart - 1_000_000_000L) {
+            filterStart = 0;
         }
-        this.firstNanos = firstNanos;
+        this.filterStart = filterStart;
     }
 
-    public void setLastNanos(long lastNanos) {
-        this.lastNanos = lastNanos;
+    public void setFilterEnd(long filterEnd) {
+        this.filterEnd = filterEnd;
     }
 
     // Need to call updateEventParsers() for
@@ -424,8 +424,8 @@
                 String name = ep.getEventType().getName();
                 ep.setOrdered(ordered);
                 ep.setReuse(reuse);
-                ep.setFirstNanos(firstNanos);
-                ep.setLastNanos(lastNanos);
+                ep.setFilterStart(filterStart);
+                ep.setFilterEnd(filterEnd);
                 if (resetEventCache) {
                     ep.resetCache();
                 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java	Mon Sep 02 21:08:41 2019 +0200
@@ -38,5 +38,4 @@
     public Object getCurrent(long key) {
         return current.get(key);
     }
-
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -28,12 +28,9 @@
 import java.io.IOException;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
-import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Objects;
-import java.util.function.Consumer;
 
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.FileAccess;
@@ -45,156 +42,26 @@
  * with chunk files.
  *
  */
-class EventDirectoryStream implements EventStream {
-
-    static final class DirectoryStream extends AbstractEventStream {
-
-        private static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
-        private static final int DEFAULT_ARRAY_SIZE = 10_000;
-
-        private final RepositoryFiles repositoryFiles;
-        private final boolean active;
-        private final FileAccess fileAccess;
-        private ChunkParser chunkParser;
-        private RecordedEvent[] sortedList;
-        protected long chunkStartNanos;
-
-        public DirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
-            super(acc, active);
-            this.fileAccess = fileAccess;
-            this.active = active;
-            repositoryFiles = new RepositoryFiles(fileAccess, p);
-        }
-
-        @Override
-        public void process() throws Exception {
-            final StreamConfiguration c1 = configuration;
-            Path path;
-            boolean validStartTime = active || c1.getStartTime() != null;
-            if (validStartTime) {
-                path = repositoryFiles.firstPath(c1.getStartNanos());
-            } else {
-                path = repositoryFiles.lastPath();
-            }
-            if (path == null) { // closed
-                return;
-            }
-            chunkStartNanos = repositoryFiles.getTimestamp(path);
-            try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
-                chunkParser = new ChunkParser(input, c1.getReuse());
-                long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
-                long start = validStartTime ? c1.getStartNanos() : segmentStart;
-                long end = c1.getEndTime() != null ? c1.getEndNanos() : Long.MAX_VALUE;
-                while (!isClosed()) {
-                    boolean awaitnewEvent = false;
-                    while (!isClosed() && !chunkParser.isChunkFinished()) {
-                        final StreamConfiguration c2 = configuration;
-                        boolean ordered = c2.getOrdered();
-                        chunkParser.setFlushOperation(flushOperation);
-                        chunkParser.setReuse(c2.getReuse());
-                        chunkParser.setOrdered(ordered);
-                        chunkParser.setFirstNanos(start);
-                        chunkParser.setLastNanos(end);
-                        chunkParser.resetEventCache();
-                        chunkParser.setParserFilter(c2.getFilter());
-                        chunkParser.updateEventParsers();
-                        clearLastDispatch();
-                        if (ordered) {
-                            awaitnewEvent = processOrdered(awaitnewEvent);
-                        } else {
-                            awaitnewEvent = processUnordered(awaitnewEvent);
-                        }
-                        if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > end) {
-                            close();
-                            return;
-                        }
-                    }
-
+final class EventDirectoryStream extends AbstractEventStream {
+    private final RepositoryFiles repositoryFiles;
+    private final boolean active;
+    private final FileAccess fileAccess;
+    private ChunkParser chunkParser;
+    private long chunkStartNanos;
+    private RecordedEvent[] sortedList;
 
-                    if (isClosed()) {
-                        return;
-                    }
-                    long durationNanos = chunkParser.getChunkDuration();
-                    path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
-                    if (path == null) {
-                        return; // stream closed
-                    }
-                    chunkStartNanos = repositoryFiles.getTimestamp(path);
-                    input.setFile(path);
-                    chunkParser = chunkParser.newChunkParser();
-                    // No need filter when we reach new chunk
-                    // start = 0;
-                }
-            }
-        }
-
-        private boolean processOrdered(boolean awaitNewEvents) throws IOException {
-            if (sortedList == null) {
-                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
-            }
-            int index = 0;
-            while (true) {
-                RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
-                if (e == null) {
-                    // wait for new event with next call to
-                    // readStreamingEvent()
-                    awaitNewEvents = true;
-                    break;
-                }
-                awaitNewEvents = false;
-                if (index == sortedList.length) {
-                    sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
-                }
-                sortedList[index++] = e;
-            }
-
-            // no events found
-            if (index == 0 && chunkParser.isChunkFinished()) {
-                return awaitNewEvents;
-            }
-            // at least 2 events, sort them
-            if (index > 1) {
-                Arrays.sort(sortedList, 0, index, END_TIME);
-            }
-            for (int i = 0; i < index; i++) {
-                dispatch(sortedList[i]);
-            }
-            return awaitNewEvents;
-        }
-
-        private boolean processUnordered(boolean awaitNewEvents) throws IOException {
-            while (true) {
-                RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
-                if (e == null) {
-                    return true;
-                } else {
-                    dispatch(e);
-                }
-            }
-        }
-
-        @Override
-        public void close() {
-            setClosed(true);
-            repositoryFiles.close();
-        }
-    }
-
-    private final AbstractEventStream eventStream;
-
-    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess access, boolean active) throws IOException {
-        eventStream = new DirectoryStream(acc, p, access, active);
+    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+        super(acc, active);
+        this.fileAccess = Objects.requireNonNull(fileAccess);
+        this.active = active;
+        this.repositoryFiles = new RepositoryFiles(fileAccess, p);
     }
 
     @Override
     public void close() {
-        eventStream.close();
-    }
-
-    @Override
-    public void onFlush(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.onFlush(action);
+        setClosed(true);
+        runCloseActions();
+        repositoryFiles.close();
     }
 
     @Override
@@ -208,75 +75,108 @@
     }
 
     @Override
-    public void onEvent(Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(action);
-        eventStream.onEvent(action);
-    }
-
-    @Override
-    public void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(eventName);
-        Objects.requireNonNull(action);
-        eventStream.onEvent(eventName, action);
-    }
+    public void process() throws Exception {
+        StreamConfiguration c = configuration;
+        Path path;
+        boolean validStartTime = active || c.getStartTime() != null;
+        if (validStartTime) {
+            path = repositoryFiles.firstPath(c.getStartNanos());
+        } else {
+            path = repositoryFiles.lastPath();
+        }
+        if (path == null) { // closed
+            return;
+        }
+        chunkStartNanos = repositoryFiles.getTimestamp(path);
+        try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
+            chunkParser = new ChunkParser(input, c.getReuse());
+            long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
+            long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
+            long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+            while (!isClosed()) {
+                boolean awaitnewEvent = false;
+                while (!isClosed() && !chunkParser.isChunkFinished()) {
+                    c = configuration;
+                    boolean ordered = c.getOrdered();
+                    chunkParser.setFlushOperation(getFlushOperation());
+                    chunkParser.setReuse(c.getReuse());
+                    chunkParser.setOrdered(ordered);
+                    chunkParser.setFilterStart(filtertStart);
+                    chunkParser.setFilterEnd(filterEnd);
+                    chunkParser.resetEventCache();
+                    chunkParser.setParserFilter(c.getFilter());
+                    chunkParser.updateEventParsers();
+                    clearLastDispatch();
+                    if (ordered) {
+                        awaitnewEvent = processOrdered(awaitnewEvent);
+                    } else {
+                        awaitnewEvent = processUnordered(awaitnewEvent);
+                    }
+                    if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
+                        close();
+                        return;
+                    }
+                }
 
-    @Override
-    public void onClose(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.addCloseAction(action);
-    }
-
-    @Override
-    public boolean remove(Object action) {
-        Objects.requireNonNull(action);
-        return eventStream.remove(action);
-    }
-
-    @Override
-    public void awaitTermination(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        eventStream.awaitTermination(timeout);
-    }
-
-    @Override
-    public void awaitTermination() {
-        eventStream.awaitTermination(Duration.ofMillis(0));
+                if (isClosed()) {
+                    return;
+                }
+                long durationNanos = chunkParser.getChunkDuration();
+                path = repositoryFiles.nextPath(chunkStartNanos + durationNanos);
+                if (path == null) {
+                    return; // stream closed
+                }
+                chunkStartNanos = repositoryFiles.getTimestamp(path);
+                input.setFile(path);
+                chunkParser = chunkParser.newChunkParser();
+                // TODO: Optimization. No need filter when we reach new chunk
+                // Could set start = 0;
+            }
+        }
     }
 
-    @Override
-    public void setReuse(boolean reuse) {
-        eventStream.setReuse(reuse);
-    }
+    private boolean processOrdered(boolean awaitNewEvents) throws IOException {
+        if (sortedList == null) {
+            sortedList = new RecordedEvent[100_000];
+        }
+        int index = 0;
+        while (true) {
+            RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+            if (e == null) {
+                // wait for new event with next call to
+                // readStreamingEvent()
+                awaitNewEvents = true;
+                break;
+            }
+            awaitNewEvents = false;
+            if (index == sortedList.length) {
+                sortedList = Arrays.copyOf(sortedList, sortedList.length * 2);
+            }
+            sortedList[index++] = e;
+        }
 
-    @Override
-    public void setOrdered(boolean ordered) {
-        eventStream.setOrdered(ordered);
-    }
-
-    @Override
-    public void setStartTime(Instant startTime) {
-        eventStream.setStartTime(startTime);
+        // no events found
+        if (index == 0 && chunkParser.isChunkFinished()) {
+            return awaitNewEvents;
+        }
+        // at least 2 events, sort them
+        if (index > 1) {
+            Arrays.sort(sortedList, 0, index, END_TIME);
+        }
+        for (int i = 0; i < index; i++) {
+            dispatch(sortedList[i]);
+        }
+        return awaitNewEvents;
     }
 
-    @Override
-    public void setEndTime(Instant endTime) {
-        eventStream.setEndTime(endTime);
-    }
-
-
-    public void start(long startNanos) {
-        eventStream.start(startNanos);
+    private boolean processUnordered(boolean awaitNewEvents) throws IOException {
+        while (true) {
+            RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
+            if (e == null) {
+                return true;
+            } else {
+                dispatch(e);
+            }
+        }
     }
-
-    public void startAsync(long startNanos) {
-        eventStream.startAsync(startNanos);
-    }
-
-    @Override
-    public void onError(Consumer<Throwable> action) {
-        // TODO Auto-generated method stub
-
-    }
-
-
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -28,12 +28,8 @@
 import java.io.IOException;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.Arrays;
 import java.util.Objects;
-import java.util.function.Consumer;
 
 import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
@@ -42,196 +38,110 @@
  * Implementation of an event stream that operates against a recording file.
  *
  */
-final class EventFileStream implements EventStream {
-
-    private final static class FileStream extends AbstractEventStream {
-        private static final int DEFAULT_ARRAY_SIZE = 100_000;
-
-        private final RecordingInput input;
-
-        private ChunkParser chunkParser;
-        private RecordedEvent[] sortedList;
-
-        public FileStream(AccessControlContext acc, Path path) throws IOException {
-            super(acc, false);
-            this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
-;        }
-
-        @Override
-        public void process() throws IOException {
-            final StreamConfiguration c1 = configuration;
-            long start = 0;
-            long end = Long.MAX_VALUE;
-            if (c1.getStartTime() != null) {
-                start = c1.getStartNanos();
-            }
-            if (c1.getEndTime() != null) {
-                end = c1.getEndNanos();
-            }
+final class EventFileStream extends AbstractEventStream {
+    private final RecordingInput input;
+    private ChunkParser chunkParser;
+    private RecordedEvent[] sortedList;
 
-            chunkParser = new ChunkParser(input, c1.getReuse());
-            while (!isClosed()) {
-                if (chunkParser.getStartNanos() > end) {
-                    close();
-                    return;
-                }
-                StreamConfiguration c2 = configuration;
-                boolean ordered = c2.getOrdered();
-                chunkParser.setFlushOperation(flushOperation);
-                chunkParser.setFirstNanos(start);
-                chunkParser.setLastNanos(end);
-                chunkParser.setReuse(c2.getReuse());
-                chunkParser.setOrdered(ordered);
-                chunkParser.resetEventCache();
-                chunkParser.setParserFilter(c2.getFiler());
-                chunkParser.updateEventParsers();
-                clearLastDispatch();
-                if (ordered) {
-                    processOrdered();
-                } else {
-                    processUnordered();
-                }
-                if (chunkParser.isLastChunk()) {
-                    return;
-                }
-                chunkParser = chunkParser.nextChunkParser();
-            }
-        }
-
-        private void processOrdered() throws IOException {
-            if (sortedList == null) {
-                sortedList = new RecordedEvent[DEFAULT_ARRAY_SIZE];
-            }
-            RecordedEvent event;
-            int index = 0;
-            while (true) {
-                event = chunkParser.readEvent();
-                if (event == null) {
-                    Arrays.sort(sortedList, 0, index, END_TIME);
-                    for (int i = 0; i < index; i++) {
-                        dispatch(sortedList[i]);
-                    }
-                    return;
-                }
-                if (index == sortedList.length) {
-                    RecordedEvent[] tmp = sortedList;
-                    sortedList = new RecordedEvent[2 * tmp.length];
-                    System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
-                }
-                sortedList[index++] = event;
-            }
-        }
-
-        private void processUnordered() throws IOException {
-            RecordedEvent event;
-            while (!isClosed()) {
-                event = chunkParser.readEvent();
-                if (event == null) {
-                    return;
-                }
-                dispatch(event);
-            }
-        }
-
-        @Override
-        public void close() {
-            setClosed(true);;
-            runCloseActions();
-            try {
-                input.close();
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-    }
-
-    private final FileStream eventStream;
-
-    public EventFileStream(Path path, Instant from, Instant to) throws IOException {
+    public EventFileStream(AccessControlContext acc, Path path) throws IOException {
+        super(acc, false);
         Objects.requireNonNull(path);
-        eventStream = new FileStream(AccessController.getContext(), path);
+        this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
     }
 
     @Override
-    public void onEvent(Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(action);
-        eventStream.onEvent(action);
+    public void start() {
+        start(0);
     }
 
     @Override
-    public void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        Objects.requireNonNull(eventName);
-        Objects.requireNonNull(action);
-        eventStream.onEvent(eventName, action);
-    }
-
-    @Override
-    public void onFlush(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.onFlush(action);
-    }
-
-    @Override
-    public void onClose(Runnable action) {
-        Objects.requireNonNull(action);
-        eventStream.addCloseAction(action);
+    public void startAsync() {
+        startAsync(0);
     }
 
     @Override
     public void close() {
-        eventStream.close();
-    }
-
-    @Override
-    public boolean remove(Object action) {
-        Objects.requireNonNull(action);
-        return eventStream.remove(action);
-    }
-
-    @Override
-    public void start() {
-        eventStream.start(0);
-    }
-
-    @Override
-    public void setReuse(boolean reuse) {
-        eventStream.setReuse(reuse);
-    }
-
-    @Override
-    public void startAsync() {
-        eventStream.startAsync(0);
+        setClosed(true);
+        runCloseActions();
+        try {
+            input.close();
+        } catch (IOException e) {
+            // ignore
+        }
     }
 
     @Override
-    public void awaitTermination(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        eventStream.awaitTermination(timeout);
-    }
+    public void process() throws IOException {
+        StreamConfiguration c = configuration;
+        long start = 0;
+        long end = Long.MAX_VALUE;
+        if (c.getStartTime() != null) {
+            start = c.getStartNanos();
+        }
+        if (c.getEndTime() != null) {
+            end = c.getEndNanos();
+        }
 
-    @Override
-    public void awaitTermination() {
-        eventStream.awaitTermination();
-    }
-
-    @Override
-    public void setOrdered(boolean ordered) {
-        eventStream.setOrdered(ordered);
+        chunkParser = new ChunkParser(input, c.getReuse());
+        while (!isClosed()) {
+            if (chunkParser.getStartNanos() > end) {
+                close();
+                return;
+            }
+            c = configuration;
+            boolean ordered = c.getOrdered();
+            chunkParser.setFlushOperation(getFlushOperation());
+            chunkParser.setFilterStart(start);
+            chunkParser.setFilterEnd(end);
+            chunkParser.setReuse(c.getReuse());
+            chunkParser.setOrdered(ordered);
+            chunkParser.resetEventCache();
+            chunkParser.setParserFilter(c.getFiler());
+            chunkParser.updateEventParsers();
+            clearLastDispatch();
+            if (ordered) {
+                processOrdered();
+            } else {
+                processUnordered();
+            }
+            if (chunkParser.isLastChunk()) {
+                return;
+            }
+            chunkParser = chunkParser.nextChunkParser();
+        }
     }
 
-    @Override
-    public void setStartTime(Instant startTime) {
-        eventStream.setStartTime(startTime);
+    private void processOrdered() throws IOException {
+        if (sortedList == null) {
+            sortedList = new RecordedEvent[10_000];
+        }
+        RecordedEvent event;
+        int index = 0;
+        while (true) {
+            event = chunkParser.readEvent();
+            if (event == null) {
+                Arrays.sort(sortedList, 0, index, END_TIME);
+                for (int i = 0; i < index; i++) {
+                    dispatch(sortedList[i]);
+                }
+                return;
+            }
+            if (index == sortedList.length) {
+                RecordedEvent[] tmp = sortedList;
+                sortedList = new RecordedEvent[2 * tmp.length];
+                System.arraycopy(tmp, 0, sortedList, 0, tmp.length);
+            }
+            sortedList[index++] = event;
+        }
     }
 
-    @Override
-    public void setEndTime(Instant endTime) {
-        eventStream.setEndTime(endTime);
-    }
-
-    @Override
-    public void onError(Consumer<Throwable> action) {
-        // TODO Auto-generated method stub
-
+    private void processUnordered() throws IOException {
+        while (!isClosed()) {
+            RecordedEvent event = chunkParser.readEvent();
+            if (event == null) {
+                return;
+            }
+            dispatch(event);
+        }
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFilter.java	Mon Sep 02 21:08:41 2019 +0200
@@ -79,5 +79,4 @@
     Duration getThreshold() {
         return threshold;
     }
-
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Mon Sep 02 21:08:41 2019 +0200
@@ -174,12 +174,12 @@
         }
     }
 
-    public void setFirstNanos(long firstNanos) {
-        this.filterStart = firstNanos;
+    public void setFilterStart(long filterStart) {
+        this.filterStart = filterStart;
     }
 
-    public void setLastNanos(long lastNanos) {
-        this.filterEnd = lastNanos;
+    public void setFilterEnd(long filterEnd) {
+        this.filterEnd = filterEnd;
     }
 
     public void setOrdered(boolean ordered) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -103,7 +103,7 @@
      *         {@code checkRead} method denies read access to the file
      */
     public static EventStream openFile(Path file) throws IOException {
-        return new EventFileStream(file, null, null);
+        return new EventFileStream(AccessController.getContext(), file);
     }
 
     /**
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Mon Sep 02 21:08:41 2019 +0200
@@ -47,13 +47,14 @@
  * A recording stream produces events from the current JVM (Java Virtual
  * Machine).
  * <p>
- * The following example records events using the default configuration and
- * prints the Garbage Collection, CPU Load and JVM Information event.
+ * The following example, shows how to record events using the default
+ * configuration and print the Garbage Collection, CPU Load and JVM Information
+ * event to standard out.
  *
  * <pre>
  * <code>
- * var c = Configuration.getConfiguration("default");
- * try (var rs = new RecordingStream(c)) {
+ * Configuration c = Configuration.getConfiguration("default");
+ * try (RecordingStream rs = new RecordingStream(c)) {
  *     rs.onEvent("jdk.GarbageCollection", System.out::println);
  *     rs.onEvent("jdk.CPULoad", System.out::println);
  *     rs.onEvent("jdk.JVMInformation", System.out::println);
@@ -67,24 +68,10 @@
 public final class RecordingStream implements AutoCloseable, EventStream {
 
     private final Recording recording;
-    private final EventDirectoryStream stream;
+    private final EventDirectoryStream directoryStream;
 
     /**
      * Creates an event stream for the current JVM (Java Virtual Machine).
-     * <p>
-     * The following example shows how to create a recording stream that prints
-     * CPU usage and information about garbage collections.
-     *
-     * <pre>
-     * <code>
-     * try (var rs = new RecordingStream()) {
-     *   rs.enable("jdk.GarbageCollection");
-     *   rs.enable("jdk.CPULoad").withPeriod(Duration.ofSeconds(1));
-     *   rs.onEvent(System.out::println);
-     *   rs.start();
-     * }
-     * </code>
-     * </pre>
      *
      * @throws IllegalStateException if Flight Recorder can't be created (for
      *         example, if the Java Virtual Machine (JVM) lacks Flight Recorder
@@ -100,8 +87,9 @@
         this.recording = new Recording();
         this.recording.setFlushInterval(Duration.ofMillis(1000));
         try {
-            this.stream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILIGED, true);
+            this.directoryStream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILIGED, true);
         } catch (IOException ioe) {
+            this.recording.close();
             throw new IllegalStateException(ioe.getMessage());
         }
     }
@@ -274,56 +262,6 @@
         recording.setMaxSize(maxSize);
     }
 
-    @Override
-    public void onEvent(String eventName, Consumer<RecordedEvent> action) {
-        stream.onEvent(eventName, action);
-    }
-
-    @Override
-    public void onEvent(Consumer<RecordedEvent> action) {
-        stream.onEvent(action);
-    }
-
-    @Override
-    public void onFlush(Runnable action) {
-        stream.onFlush(action);
-    }
-
-    @Override
-    public void onClose(Runnable action) {
-        stream.onClose(action);
-    }
-
-    @Override
-    public void close() {
-        recording.close();
-        stream.close();
-    }
-
-    @Override
-    public boolean remove(Object action) {
-        return stream.remove(action);
-    }
-
-    @Override
-    public void start() {
-        PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
-        long startNanos = pr.start();
-        stream.start(startNanos);
-    }
-
-    @Override
-    public void startAsync() {
-        PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
-        long startNanos = pr.start();
-        stream.startAsync(startNanos);
-    }
-
-    @Override
-    public void awaitTermination(Duration timeout) {
-        stream.awaitTermination(timeout);
-    }
-
     /**
      * Determines how often events are made available for streaming.
      *
@@ -339,32 +277,82 @@
     }
 
     @Override
-    public void awaitTermination() {
-        stream.awaitTermination();
-    }
-
-    @Override
     public void setReuse(boolean reuse) {
-        stream.setReuse(reuse);
+        directoryStream.setReuse(reuse);
     }
 
     @Override
     public void setOrdered(boolean ordered) {
-        stream.setOrdered(ordered);
+        directoryStream.setOrdered(ordered);
     }
 
     @Override
     public void setStartTime(Instant startTime) {
-        stream.setStartTime(startTime);
+        directoryStream.setStartTime(startTime);
     }
 
     @Override
     public void setEndTime(Instant endTime) {
-        stream.setStartTime(endTime);
+        directoryStream.setStartTime(endTime);
+    }
+
+    @Override
+    public void onEvent(String eventName, Consumer<RecordedEvent> action) {
+        directoryStream.onEvent(eventName, action);
+    }
+
+    @Override
+    public void onEvent(Consumer<RecordedEvent> action) {
+        directoryStream.onEvent(action);
+    }
+
+    @Override
+    public void onFlush(Runnable action) {
+        directoryStream.onFlush(action);
+    }
+
+    @Override
+    public void onClose(Runnable action) {
+        directoryStream.onClose(action);
     }
 
     @Override
     public void onError(Consumer<Throwable> action) {
-        stream.onError(action);
+        directoryStream.onError(action);
+    }
+
+    @Override
+    public void close() {
+        recording.close();
+        directoryStream.close();
+    }
+
+    @Override
+    public boolean remove(Object action) {
+        return directoryStream.remove(action);
+    }
+
+    @Override
+    public void start() {
+        PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
+        long startNanos = pr.start();
+        directoryStream.start(startNanos);
+    }
+
+    @Override
+    public void startAsync() {
+        PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
+        long startNanos = pr.start();
+        directoryStream.startAsync(startNanos);
+    }
+
+    @Override
+    public void awaitTermination(Duration timeout) {
+        directoryStream.awaitTermination(timeout);
+    }
+
+    @Override
+    public void awaitTermination() {
+        directoryStream.awaitTermination();
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java	Mon Sep 02 21:03:40 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/SecuritySupport.java	Mon Sep 02 21:08:41 2019 +0200
@@ -436,7 +436,7 @@
         });
     }
 
-    static Thread createThreadWitNoPermissions(String threadName, Runnable runnable) {
+    public static Thread createThreadWitNoPermissions(String threadName, Runnable runnable) {
         return doPrivilegedWithReturn(() -> new Thread(runnable, threadName), new Permission[0]);
     }