Restructure stream configuration JEP-349-branch
authoregahlin
Fri, 13 Sep 2019 18:46:07 +0200
branchJEP-349-branch
changeset 58129 7b751fe181a5
parent 58121 6f8f18ac1d54
child 58145 bc54ed8d908a
child 58155 1fe292a94df1
Restructure stream configuration
src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java
src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java
src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java
--- a/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/java.desktop/windows/native/common/awt/systemscale/systemScale.cpp	Fri Sep 13 18:46:07 2019 +0200
@@ -77,7 +77,7 @@
         HRESULT res = D2D1CreateFactory(D2D1_FACTORY_TYPE_SINGLE_THREADED,
                                         &m_pDirect2dFactory);
         if (res == S_OK) {
-            m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY);
+           // m_pDirect2dFactory->GetDesktopDpi(dpiX, dpiY);
             m_pDirect2dFactory->Release();
         }
     }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -26,20 +26,16 @@
 package jdk.jfr.consumer;
 
 import java.io.IOException;
-import java.lang.invoke.VarHandle;
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
-import jdk.jfr.EventType;
 import jdk.jfr.internal.JVM;
 import jdk.jfr.internal.LogLevel;
 import jdk.jfr.internal.LogTag;
@@ -59,40 +55,19 @@
  */
 abstract class AbstractEventStream implements EventStream {
 
-    final static class EventDispatcher {
-        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-        final String eventName;
-        final Consumer<RecordedEvent> action;
-
-        public EventDispatcher(Consumer<RecordedEvent> action) {
-            this(null, action);
-        }
+    static final Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
 
-        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
-            this.eventName = eventName;
-            this.action = action;
-        }
-
-        public void offer(RecordedEvent event) {
-            action.accept(event);
-        }
-
-        public boolean accepts(EventType eventType) {
-            return (eventName == null || eventType.getName().equals(eventName));
-        }
-    }
-
-    final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
     private final static AtomicLong counter = new AtomicLong(1);
-    private volatile Thread thread;
     private final Object terminated = new Object();
     private final boolean active;
-    private final Runnable flushOperation = () -> runFlushActions();
+    private final Runnable flushOperation = () -> dispatcher().runFlushActions();
     private final AccessControlContext accessControllerContext;
-    private final Object configurationLock = new Object();
+    private final StreamConfiguration configuration = new StreamConfiguration();
 
-    // Modified by updateConfiguration()
-    protected volatile StreamConfiguration configuration = new StreamConfiguration();
+    private volatile Thread thread;
+    private Dispatcher dispatcher;
+
+    private volatile boolean closed;
 
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
         this.accessControllerContext = Objects.requireNonNull(acc);
@@ -108,99 +83,85 @@
     @Override
     abstract public void close();
 
-    // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration so only one
-    // thread at a time can change the configuration.
-    //
-    // The purpose is not to guard the configuration field. A new
-    // configuration is published using updateConfiguration
-    //
+    protected final Dispatcher dispatcher() {
+        if (configuration.hasChanged()) {
+            synchronized (configuration) {
+                dispatcher = new Dispatcher(configuration);
+            }
+        }
+        return dispatcher;
+    }
+
     @Override
     public final void setOrdered(boolean ordered) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setOrdered(ordered));
-        }
+        configuration.setOrdered(ordered);
     }
 
     @Override
     public final void setReuse(boolean reuse) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setReuse(reuse));
-        }
+        configuration.setReuse(reuse);
     }
 
     @Override
     public final void setStartTime(Instant startTime) {
         Objects.nonNull(startTime);
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Stream is already started");
             }
             if (startTime.isBefore(Instant.EPOCH)) {
                 startTime = Instant.EPOCH;
             }
-            updateConfiguration(new StreamConfiguration(configuration).setStartTime(startTime));
+            configuration.setStartTime(startTime);
         }
     }
 
     @Override
     public final void setEndTime(Instant endTime) {
         Objects.requireNonNull(endTime);
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Stream is already started");
             }
-            updateConfiguration(new StreamConfiguration(configuration).setEndTime(endTime));
+            configuration.setEndTime(endTime);
         }
     }
 
     @Override
     public final void onEvent(Consumer<RecordedEvent> action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            add(new EventDispatcher(action));
-        }
+        configuration.addEventAction(action);
     }
 
     @Override
     public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
         Objects.requireNonNull(eventName);
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            add(new EventDispatcher(eventName, action));
-        }
+        configuration.addEventAction(eventName, action);
     }
 
     @Override
     public final void onFlush(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addFlushAction(action));
-        }
+        configuration.addFlushAction(action);
     }
 
     @Override
     public final void onClose(Runnable action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addCloseAction(action));
-        }
+        configuration.addCloseAction(action);
     }
 
     @Override
     public final void onError(Consumer<Throwable> action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).addErrorAction(action));
-        }
+        configuration.addErrorAction(action);
     }
 
     @Override
     public final boolean remove(Object action) {
         Objects.requireNonNull(action);
-        synchronized (configurationLock) {
-            return updateConfiguration(new StreamConfiguration(configuration).remove(action));
-        }
+        return configuration.remove(action);
     }
 
     @Override
@@ -247,53 +208,12 @@
 
     protected abstract void process() throws Exception;
 
-    protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
-        EventType type = event.getEventType();
-        EventDispatcher[] dispatchers = null;
-        if (type == c.cacheEventType) {
-            dispatchers = c.cacheDispatchers;
-        } else {
-            dispatchers = c.dispatcherLookup.get(type.getId());
-            if (dispatchers == null) {
-                List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : c.getDispatchers()) {
-                    if (e.accepts(type)) {
-                        list.add(e);
-                    }
-                }
-                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                c.dispatcherLookup.put(type.getId(), dispatchers);
-            }
-            c.cacheDispatchers = dispatchers;
-        }
-        for (int i = 0; i < dispatchers.length; i++) {
-            try {
-                dispatchers[i].offer(event);
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
-    protected final void runCloseActions() {
-        Runnable[] closeActions = configuration.getCloseActions();
-        for (int i = 0; i < closeActions.length; i++) {
-            try {
-                closeActions[i].run();
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
     protected final void setClosed(boolean closed) {
-        synchronized (configurationLock) {
-            updateConfiguration(new StreamConfiguration(configuration).setClosed(closed));
-        }
+        this.closed = closed;
     }
 
     protected final boolean isClosed() {
-        return configuration.isClosed();
+        return closed;
     }
 
     protected final void startAsync(long startNanos) {
@@ -313,34 +233,15 @@
         return flushOperation;
     }
 
-    private void add(EventDispatcher e) {
-        updateConfiguration(new StreamConfiguration(configuration).addDispatcher(e));
-    }
-
-    private boolean updateConfiguration(StreamConfiguration newConfiguration) {
-        if (!Thread.holdsLock(configurationLock)) {
-            throw new InternalError("Modification of configuration without proper lock");
-        }
-        if (newConfiguration.hasChanged()) {
-            // Publish objects held by configuration object
-            VarHandle.releaseFence();
-            configuration = newConfiguration;
-            return true;
-        }
-        return false;
-    }
-
     private void startInternal(long startNanos) {
-        synchronized (configurationLock) {
-            if (configuration.isStarted()) {
+        synchronized (configuration) {
+            if (configuration.started) {
                 throw new IllegalStateException("Event stream can only be started once");
             }
-            StreamConfiguration c = new StreamConfiguration(configuration);
             if (active) {
-                c.setStartNanos(startNanos);
+                configuration.setStartNanos(startNanos);
             }
-            c.setStarted(true);
-            updateConfiguration(c);
+            configuration.setStarted(true);
         }
     }
 
@@ -348,8 +249,14 @@
         JVM.getJVM().exclude(Thread.currentThread());
         try {
             process();
+        } catch (IOException ioe) {
+            // This can happen if a chunk file is removed, or
+            // a file is access that has been closed
+            // This is "normal" behavior for streaming and the
+            // stream will be closed when this happens
         } catch (Exception e) {
-            defaultErrorHandler(e);
+            // TODO: Remove before integrating
+            e.printStackTrace();
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
             try {
@@ -362,30 +269,6 @@
         }
     }
 
-    private void handleError(Throwable e) {
-        Consumer<?>[] consumers = configuration.errorActions;
-        if (consumers.length == 0) {
-            defaultErrorHandler(e);
-            return;
-        }
-        for (int i = 0; i < consumers.length; i++) {
-            @SuppressWarnings("unchecked")
-            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
-            conusmer.accept(e);
-        }
-    }
-
-    private void runFlushActions() {
-        Runnable[] flushActions = configuration.getFlushActions();
-        for (int i = 0; i < flushActions.length; i++) {
-            try {
-                flushActions[i].run();
-            } catch (Exception e) {
-                handleError(e);
-            }
-        }
-    }
-
     private void run(AccessControlContext accessControlContext) {
         AccessController.doPrivileged(new PrivilegedAction<Void>() {
             @Override
@@ -400,8 +283,4 @@
         counter.incrementAndGet();
         return "JFR Event Stream " + counter;
     }
-
-    private void defaultErrorHandler(Throwable e) {
-        e.printStackTrace();
-    }
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Fri Sep 13 18:46:07 2019 +0200
@@ -48,6 +48,28 @@
  *
  */
 final class ChunkParser {
+
+    static final class ParserConfiguration {
+        final boolean reuse;
+        final boolean ordered;
+        final InternalEventFilter eventFilter;
+
+        long filterStart;
+        long filterEnd;
+
+        public ParserConfiguration(long filterStart, long filterEnd, boolean reuse, boolean ordered, InternalEventFilter filter) {
+            this.filterStart = filterStart;
+            this.filterEnd = filterEnd;
+            this.reuse = reuse;
+            this.ordered = ordered;
+            this.eventFilter = filter;
+        }
+
+        public ParserConfiguration() {
+            this(0, Long.MAX_VALUE, false, false, InternalEventFilter.ACCEPT_ALL);
+        }
+    }
+
     // Checkpoint that finishes a flush segment
     static final byte CHECKPOINT_FLUSH_MASK = 1;
     // Checkpoint contains chunk header information in the first pool
@@ -70,24 +92,24 @@
     private LongMap<Type> typeMap;
     private LongMap<Parser> parsers;
     private boolean chunkFinished;
-    private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
-    private boolean reuse;
-    private boolean ordered;
-    private boolean resetEventCache;
-    private long filterStart = 0;
-    private long filterEnd = Long.MAX_VALUE;
+
     private Runnable flushOperation;
+    private ParserConfiguration configuration;
 
-    public ChunkParser(RecordingInput input, boolean reuse) throws IOException {
-       this(new ChunkHeader(input), null, 1000);
-       this.reuse = reuse;
+    public ChunkParser(RecordingInput input) throws IOException {
+        this(input, new ParserConfiguration());
+    }
+
+    public ChunkParser(RecordingInput input, ParserConfiguration pc) throws IOException {
+       this(new ChunkHeader(input), null, pc);
     }
 
     public ChunkParser(ChunkParser previous) throws IOException {
-        this(new ChunkHeader(previous.input), previous, 1000);
+        this(new ChunkHeader(previous.input), previous, new ParserConfiguration());
      }
 
-    private ChunkParser(ChunkHeader header, ChunkParser previous, long pollInterval) throws IOException {
+    private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc) throws IOException {
+        this.configuration = pc;
         this.input = header.getInput();
         this.chunkHeader = header;
         if (previous == null) {
@@ -98,9 +120,7 @@
             this.constantLookups = previous.constantLookups;
             this.previousMetadata = previous.metadata;
             this.pollInterval = previous.pollInterval;
-            this.ordered = previous.ordered;
-            this.reuse = previous.reuse;
-            this.eventFilter = previous.eventFilter;
+            this.configuration = previous.configuration;
         }
         this.metadata = header.readMetadata(previousMetadata);
         this.timeConverter = new TimeConverter(chunkHeader, metadata.getGMTOffset());
@@ -108,7 +128,7 @@
             ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
             parsers = factory.getParsers();
             typeMap = factory.getTypeMap();
-            updateEventParsers();
+            updateConfiguration();
         } else {
             parsers = previous.parsers;
             typeMap = previous.typeMap;
@@ -122,12 +142,37 @@
         input.position(chunkHeader.getEventStart());
     }
 
-    public void setParserFilter(InternalEventFilter filter) {
-        this.eventFilter = filter;
+    public ChunkParser nextChunkParser() throws IOException {
+        return new ChunkParser(chunkHeader.nextHeader(), this, configuration);
+    }
+
+    private void updateConfiguration() {
+        updateConfiguration(configuration, false);
     }
 
-    public InternalEventFilter getEventFilter() {
-        return this.eventFilter;
+    public void updateConfiguration(ParserConfiguration configuration, boolean resetEventCache) {
+        this.configuration = configuration;
+        parsers.forEach(p -> {
+            if (p instanceof EventParser) {
+                EventParser ep = (EventParser) p;
+                if (resetEventCache) {
+                    ep.resetCache();
+                }
+                String name = ep.getEventType().getName();
+                ep.setOrdered(configuration.ordered);
+                ep.setReuse(configuration.reuse);
+                ep.setFilterStart(configuration.filterStart);
+                ep.setFilterEnd(configuration.filterEnd);
+                long threshold = configuration.eventFilter.getThreshold(name);
+                if (threshold >= 0) {
+                    ep.setEnabled(true);
+                    ep.setThresholdNanos(threshold);
+                } else {
+                    ep.setEnabled(false);
+                    ep.setThresholdNanos(Long.MAX_VALUE);
+                }
+            }
+        });
     }
 
     /**
@@ -161,7 +206,7 @@
                 ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
                 parsers = factory.getParsers();
                 typeMap = factory.getTypeMap();
-                updateEventParsers();
+                updateConfiguration();;
             }
             if (contantPosition != chunkHeader.getConstantPoolPosition()) {
                 Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
@@ -372,76 +417,14 @@
         return new ChunkParser(this);
     }
 
-    public ChunkParser nextChunkParser() throws IOException {
-        return new ChunkParser(chunkHeader.nextHeader(), this, pollInterval);
-    }
-
     public boolean isChunkFinished() {
         return chunkFinished;
     }
 
-    // Need to call updateEventParsers() for
-    // change to take effect
-    public void setReuse(boolean resue) {
-        this.reuse = resue;
-    }
-
     public void setFlushOperation(Runnable flushOperation) {
         this.flushOperation = flushOperation;
     }
 
-    // Need to call updateEventParsers() for
-    // change to take effect
-    public void setOrdered(boolean ordered) {
-        this.ordered = ordered;
-    }
-
-    // Need to call updateEventParsers() for
-    // change to take effect
-    public void setFilterStart(long filterStart) {
-        long chunkStart = chunkHeader.getStartNanos();
-        // Optimization.
-        if (filterStart < chunkStart - 1_000_000_000L) {
-            filterStart = 0;
-        }
-        this.filterStart = filterStart;
-    }
-
-    public void setFilterEnd(long filterEnd) {
-        this.filterEnd = filterEnd;
-    }
-
-    // Need to call updateEventParsers() for
-    // change to take effect
-    public void resetEventCache() {
-        this.resetEventCache = true;
-    }
-
-    public void updateEventParsers() {
-        parsers.forEach(p -> {
-            if (p instanceof EventParser) {
-                EventParser ep = (EventParser) p;
-                String name = ep.getEventType().getName();
-                ep.setOrdered(ordered);
-                ep.setReuse(reuse);
-                ep.setFilterStart(filterStart);
-                ep.setFilterEnd(filterEnd);
-                if (resetEventCache) {
-                    ep.resetCache();
-                }
-                long threshold = eventFilter.getThreshold(name);
-                if (threshold >= 0) {
-                    ep.setEnabled(true);
-                    ep.setThresholdNanos(threshold);
-                } else {
-                    ep.setEnabled(false);
-                    ep.setThresholdNanos(Long.MAX_VALUE);
-                }
-            }
-        });
-        resetEventCache = false;
-    }
-
     public long getChunkDuration() {
         return chunkHeader.getDurationNanos();
     }
@@ -449,4 +432,5 @@
     public long getStartNanos() {
         return chunkHeader.getStartNanos();
     }
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/Dispatcher.java	Fri Sep 13 18:46:07 2019 +0200
@@ -0,0 +1,144 @@
+package jdk.jfr.consumer;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
+import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.consumer.InternalEventFilter;
+
+public final class Dispatcher {
+
+    public final static class EventDispatcher {
+        final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
+        final String eventName;
+        final Consumer<RecordedEvent> action;
+
+        public EventDispatcher(Consumer<RecordedEvent> action) {
+            this(null, action);
+        }
+
+        public EventDispatcher(String eventName, Consumer<RecordedEvent> action) {
+            this.eventName = eventName;
+            this.action = action;
+        }
+
+        public void offer(RecordedEvent event) {
+            action.accept(event);
+        }
+
+        public boolean accepts(EventType eventType) {
+            return (eventName == null || eventType.getName().equals(eventName));
+        }
+    }
+
+    final Consumer<Throwable>[] errorActions;
+    final Runnable[] flushActions;
+    final Runnable[] closeActions;
+    final EventDispatcher[] dispatchers;
+    final LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+    final ParserConfiguration parserConfiguration;
+    final Instant startTime;
+    final Instant endTime;
+    final long startNanos;
+    final long endNanos;
+
+    // Cache
+    private EventType cacheEventType;
+    private EventDispatcher[] cacheDispatchers;
+
+    @SuppressWarnings({"unchecked","rawtypes"})
+    public Dispatcher(StreamConfiguration c) {
+        this.flushActions = c.flushActions.toArray(new Runnable[0]);
+        this.closeActions = c.closeActions.toArray(new Runnable[0]);
+        this.errorActions = c.errorActions.toArray(new Consumer[0]);
+        this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
+        this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
+        this.startTime = c.startTime;
+        this.endTime = c.endTime;
+        this.startNanos = c.startNanos;
+        this.endNanos = c.endNanos;
+    }
+
+    private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+        InternalEventFilter ef = new InternalEventFilter();
+        for (EventDispatcher ed : dispatchers) {
+            String name = ed.eventName;
+            if (name == null) {
+                return InternalEventFilter.ACCEPT_ALL;
+            }
+            ef.setThreshold(name, 0);
+        }
+        return ef;
+    }
+
+    protected final void dispatch(RecordedEvent event) {
+        EventType type = event.getEventType();
+        EventDispatcher[] dispatchers = null;
+        if (type == cacheEventType) {
+            dispatchers = cacheDispatchers;
+        } else {
+            dispatchers = dispatcherLookup.get(type.getId());
+            if (dispatchers == null) {
+                List<EventDispatcher> list = new ArrayList<>();
+                for (EventDispatcher e : this.dispatchers) {
+                    if (e.accepts(type)) {
+                        list.add(e);
+                    }
+                }
+                dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
+                dispatcherLookup.put(type.getId(), dispatchers);
+            }
+            cacheDispatchers = dispatchers;
+        }
+        for (int i = 0; i < dispatchers.length; i++) {
+            try {
+                dispatchers[i].offer(event);
+            } catch (Exception e) {
+                handleError(e);
+            }
+        }
+    }
+
+    public void handleError(Throwable e) {
+        Consumer<?>[] consumers = this.errorActions;
+        if (consumers.length == 0) {
+            defaultErrorHandler(e);
+            return;
+        }
+        for (int i = 0; i < consumers.length; i++) {
+            @SuppressWarnings("unchecked")
+            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+            conusmer.accept(e);
+        }
+    }
+
+    public void runFlushActions() {
+        Runnable[] flushActions = this.flushActions;
+        for (int i = 0; i < flushActions.length; i++) {
+            try {
+                flushActions[i].run();
+            } catch (Exception e) {
+                handleError(e);
+            }
+        }
+    }
+
+    public void runCloseActions() {
+        Runnable[] closeActions = this.closeActions;
+        for (int i = 0; i < closeActions.length; i++) {
+            try {
+                closeActions[i].run();
+            } catch (Exception e) {
+                handleError(e);
+            }
+        }
+    }
+
+    void defaultErrorHandler(Throwable e) {
+        e.printStackTrace();
+    }
+}
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -32,6 +32,7 @@
 import java.util.Arrays;
 import java.util.Objects;
 
+import jdk.jfr.consumer.ChunkParser.ParserConfiguration;
 import jdk.jfr.internal.Utils;
 import jdk.jfr.internal.consumer.FileAccess;
 import jdk.jfr.internal.consumer.RecordingInput;
@@ -46,6 +47,7 @@
     private final RepositoryFiles repositoryFiles;
     private final boolean active;
     private final FileAccess fileAccess;
+
     private ChunkParser chunkParser;
     private long chunkStartNanos;
     private RecordedEvent[] sortedList;
@@ -60,7 +62,7 @@
     @Override
     public void close() {
         setClosed(true);
-        runCloseActions();
+        dispatcher().runCloseActions();
         repositoryFiles.close();
     }
 
@@ -76,11 +78,12 @@
 
     @Override
     protected void process() throws Exception {
-        StreamConfiguration c = configuration;
+        Dispatcher disp = dispatcher();
+
         Path path;
-        boolean validStartTime = active || c.getStartTime() != null;
+        boolean validStartTime = active || disp.startTime != null;
         if (validStartTime) {
-            path = repositoryFiles.firstPath(c.getStartNanos());
+            path = repositoryFiles.firstPath(disp.startNanos);
         } else {
             path = repositoryFiles.lastPath();
         }
@@ -89,28 +92,24 @@
         }
         chunkStartNanos = repositoryFiles.getTimestamp(path);
         try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
-            chunkParser = new ChunkParser(input, c.getReuse());
+            chunkParser = new ChunkParser(input, disp.parserConfiguration);
             long segmentStart = chunkParser.getStartNanos() + chunkParser.getChunkDuration();
-            long filtertStart = validStartTime ? c.getStartNanos() : segmentStart;
-            long filterEnd = c.getEndTime() != null ? c.getEndNanos() : Long.MAX_VALUE;
+            long filterStart = validStartTime ? disp.startNanos : segmentStart;
+            long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
+
             while (!isClosed()) {
                 boolean awaitnewEvent = false;
                 while (!isClosed() && !chunkParser.isChunkFinished()) {
-                    c = configuration;
-                    boolean ordered = c.getOrdered();
+                    disp = dispatcher();
+                    ParserConfiguration pc = disp.parserConfiguration;
+                    pc.filterStart = filterStart;
+                    pc.filterEnd = filterEnd;
+                    chunkParser.updateConfiguration(pc, true);
                     chunkParser.setFlushOperation(getFlushOperation());
-                    chunkParser.setReuse(c.getReuse());
-                    chunkParser.setOrdered(ordered);
-                    chunkParser.setFilterStart(filtertStart);
-                    chunkParser.setFilterEnd(filterEnd);
-                    chunkParser.resetEventCache();
-                    chunkParser.setParserFilter(c.getFilter());
-                    chunkParser.updateEventParsers();
-                    c.clearDispatchCache();
-                    if (ordered) {
-                        awaitnewEvent = processOrdered(c, awaitnewEvent);
+                    if (pc.ordered) {
+                        awaitnewEvent = processOrdered(disp, awaitnewEvent);
                     } else {
-                        awaitnewEvent = processUnordered(c, awaitnewEvent);
+                        awaitnewEvent = processUnordered(disp, awaitnewEvent);
                     }
                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
                         close();
@@ -135,7 +134,7 @@
         }
     }
 
-    private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+    private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
         if (sortedList == null) {
             sortedList = new RecordedEvent[100_000];
         }
@@ -164,18 +163,18 @@
             Arrays.sort(sortedList, 0, index, END_TIME);
         }
         for (int i = 0; i < index; i++) {
-            dispatch(c, sortedList[i]);
+            c.dispatch(sortedList[i]);
         }
         return awaitNewEvents;
     }
 
-    private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
+    private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
         while (true) {
             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
             if (e == null) {
                 return true;
             } else {
-                dispatch(c, e);
+                c.dispatch(e);
             }
         }
     }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -62,7 +62,7 @@
     @Override
     public void close() {
         setClosed(true);
-        runCloseActions();
+        dispatcher().runCloseActions();
         try {
             input.close();
         } catch (IOException e) {
@@ -72,46 +72,40 @@
 
     @Override
     protected void process() throws IOException {
-        StreamConfiguration c = configuration;
+        Dispatcher disp = dispatcher();
         long start = 0;
         long end = Long.MAX_VALUE;
-        if (c.getStartTime() != null) {
-            start = c.getStartNanos();
+        if (disp.startTime != null) {
+            start = disp.startNanos;
         }
-        if (c.getEndTime() != null) {
-            end = c.getEndNanos();
+        if (disp.endTime != null) {
+            end = disp.endNanos;
         }
 
-        chunkParser = new ChunkParser(input, c.getReuse());
+        chunkParser = new ChunkParser(input, disp.parserConfiguration);
         while (!isClosed()) {
             if (chunkParser.getStartNanos() > end) {
                 close();
                 return;
             }
-            c = configuration;
-            boolean ordered = c.getOrdered();
+            disp = dispatcher();
+            disp.parserConfiguration.filterStart = start;
+            disp.parserConfiguration.filterEnd = end;
+            chunkParser.updateConfiguration(disp.parserConfiguration, true);
             chunkParser.setFlushOperation(getFlushOperation());
-            chunkParser.setFilterStart(start);
-            chunkParser.setFilterEnd(end);
-            chunkParser.setReuse(c.getReuse());
-            chunkParser.setOrdered(ordered);
-            chunkParser.resetEventCache();
-            chunkParser.setParserFilter(c.getFiler());
-            chunkParser.updateEventParsers();
-            c.clearDispatchCache();
-            if (ordered) {
-                processOrdered(c);
+            if (disp.parserConfiguration.ordered) {
+                processOrdered(disp);
             } else {
-                processUnordered(c);
+                processUnordered(disp);
             }
-            if (chunkParser.isLastChunk()) {
+            if (isClosed() || chunkParser.isLastChunk()) {
                 return;
             }
             chunkParser = chunkParser.nextChunkParser();
         }
     }
 
-    private void processOrdered(StreamConfiguration c) throws IOException {
+    private void processOrdered(Dispatcher c) throws IOException {
         if (sortedList == null) {
             sortedList = new RecordedEvent[10_000];
         }
@@ -122,7 +116,7 @@
             if (event == null) {
                 Arrays.sort(sortedList, 0, index, END_TIME);
                 for (int i = 0; i < index; i++) {
-                    dispatch(c, sortedList[i]);
+                    c.dispatch(sortedList[i]);
                 }
                 return;
             }
@@ -135,13 +129,13 @@
         }
     }
 
-    private void processUnordered(StreamConfiguration c) throws IOException {
+    private void processUnordered(Dispatcher c) throws IOException {
         while (!isClosed()) {
             RecordedEvent event = chunkParser.readEvent();
             if (event == null) {
                 return;
             }
-            dispatch(c, event);
+            c.dispatch(event);
         }
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -40,12 +40,60 @@
 
 /**
  * Represents a stream of events.
+ * <p>
+ * A stream is a sequence of events and the way to interact with a stream is to
+ * register actions.
+ * <p>
+ * To receive a notification when an event arrives, register an action using the
+ * {@link #onEvent(Consumer)} method. To filter the stream for an event with a
+ * specific name, use {@link #onEvent(String, Consumer)} method.
+ *
+ * By default, the same {@code RecordedEvent} object can be used for
+ * representing two or more distinct events. The object can be delivered
+ * multiple times to the same action as well as to other actions. If the life
+ * cycle of the event object is needed outside the scope of an action, the
+ * {@link #setReuse(boolean)} method should be set to {@code false} so that a
+ * new object is allocated for each event.
+ *
+ * <p>
+ * Events are delivered in batches. To receive a notification when a batch is
+ * complete, register an action using the {@link #onFlush(Runnable)} method.
+ * This is an opportunity to aggregate or push data to external systems while
+ * the Java Virtual Machine (JVM) is preparing the next batch.
+ * <p>
+ * Events within a batch are sorted chronologically by their end time. If
+ * ordering is not a concern, sorting can be disabled using the
+ * {@link #setOrdered(boolean)} method.
+ * <p>
+ * To dispatch events to registered actions, the stream must be started. To
+ * start processing in the current thread, invoke the {@link #start()} method.
+ * To process actions asynchronously in a separate thread, invoke the
+ * {@link #startAsync()} method. To await completion of the stream, use the
+ * awaitTermination {@link #awaitTermination()} or the {link
+ * {@link #awaitTermination(Duration)} method.
+ * <p>
+ * When a stream ends it is automatically closed. To manually stop processing of
+ * events, close the stream with the {@link #close()} method. A stream can also
+ * be automatically closed in exceptional circumstances, for instance if the JVM
+ * exits. To receive a notification in any of these occasions, use the
+ * {@link #onClose(Runnable)} method to register an action.
+ * <p>
+ * If an unexpected exception occurs in an action, it is possible to catch the
+ * exception in an error handler. An error handler can be registered using the
+ * {@link #onError(Runnable)} method. If no error handler is registered, the
+ * default behavior is to print the exception and its backtrace to the standard
+ * error stream.
+ * <p>
+ * The following example demonstrates how an {@code EventStream} can be used to
+ * listen to garbage collection and CPU Load events
+ * <p>
+ *
  */
 public interface EventStream extends AutoCloseable {
 
     /**
-     * Creates a stream from the disk repository of the current Java Virtual
-     * Machine (JVM).
+     * Creates a stream from the repository of the current Java Virtual Machine
+     * (JVM).
      * <p>
      * By default, the stream starts with the next event flushed by Flight
      * Recorder.
@@ -96,7 +144,7 @@
      *
      * @return an event stream, not {@code null}
      *
-     * @throws IOException if a stream can't be opened, or an I/O error occurs
+     * @throws IOException if the file can't be opened, or an I/O error occurs
      *         during reading
      *
      * @throws SecurityException if a security manager exists and its
@@ -107,71 +155,75 @@
     }
 
     /**
-     * Performs an action on all events in the stream.
+     * Registers an action to perform on all events in the stream.
      *
-     * @param action an action to be performed on each {@code RecordedEvent},
-     *        not {@code null}
+     * @param action an action to perform on each {@code RecordedEvent}, not
+     *        {@code null}
      */
     void onEvent(Consumer<RecordedEvent> action);
 
     /**
-     * Performs an action on all events in the stream with a specified name.
+     * Registers an action to perform on all events matching a name.
      *
      * @param eventName the name of the event, not {@code null}
      *
-     * @param action an action to be performed on each {@code RecordedEvent}
-     *        that matches the event name, not {@code null}
+     * @param action an action to perform on each {@code RecordedEvent} matching
+     *        the event name, not {@code null}
      */
     void onEvent(String eventName, Consumer<RecordedEvent> action);
 
     /**
-     * Performs an action when the event stream has been flushed.
+     * Registers an action to perform after the stream has been flushed.
      *
-     * @param action an action to be performed after stream has been flushed,
-     *        not {@code null}
+     * @param action an action to perform after the stream has been
+     *        flushed, not {@code null}
      */
     void onFlush(Runnable action);
 
     /**
-     * Performs an action if an exception occurs when processing the stream.
+     * Registers an action to perform if an exception occurs.
      * <p>
-     * if an error handler has not been added to the stream, an exception stack
-     * trace is printed to standard error.
+     * if an action is not registered, an exception stack trace is printed to
+     * standard error.
      * <p>
-     * Adding an error handler overrides the default behavior. If multiple error
-     * handlers have been added, they will be executed in the order they were
-     * added.
+     * Registering an action overrides the default behavior. If multiple actions
+     * have been registered, they are performed in the order of registration.
+     * <p>
+     * If this method itself throws an exception, resulting behavior is
+     * undefined.
      *
-     * @param action an action to be performed if an exception occurs, not
+     * @param action an action to perform if an exception occurs, not
      *        {@code null}
      */
     void onError(Consumer<Throwable> action);
 
     /**
-     * Performs an action when the event stream is closed.
+     * Registers an action to perform when the stream is closed.
      * <p>
-     * If the stream is already closed, the action will be executed immediately
+     * If the stream is already closed, the action will be performed immediately
      * in the current thread.
      *
-     * @param action an action to be performed after the stream has been closed,
-     *        not {@code null}
+     * @param action an action to perform after the stream is closed, not
+     *        {@code null}
+     * @see #close()
      */
     void onClose(Runnable action);
 
     /**
-     * Releases all resources associated with this event stream.
+     * Releases all resources associated with this stream.
      */
     void close();
 
     /**
-     * Removes an action from the stream.
+     * Unregisters an action.
      * <p>
-     * If the action has been added multiple times, all instance of it will be
-     * removed.
+     * If the action has been registered multiple times, all instances are
+     * unregistered.
      *
-     * @param action the action to remove, not {@code null}
+     * @param action the action to unregister, not {@code null}
      *
-     * @return {@code true} if the action was removed, {@code false} otherwise
+     * @return {@code true} if the action was unregistered, {@code false}
+     *         otherwise
      *
      * @see #onEvent(Consumer)
      * @see #onEvent(String, Consumer)
@@ -183,91 +235,98 @@
 
     /**
      * Specifies that the event object in an {@link #onEvent(Consumer)} action
-     * is to be reused.
+     * can be reused.
      * <p>
-     * If reuse is set to {@code true), a callback should not keep a reference
-     * to the event object after the callback from {@code onEvent} has returned.
+     * If reuse is set to {@code true), an action should not keep a reference
+     * to the event object after the action has completed.
      *
-     * @param resuse if event objects can be reused between calls to
-     * {@code #onEvent(Consumer)}
-     *
+     * @param reuse {@code true} if an event object can be reused, {@code false}
+     * otherwise
      */
-    public void setReuse(boolean reuse);
+    void setReuse(boolean reuse);
 
     /**
      * Specifies that events arrives in chronological order, sorted by the time
-     * they were committed to the event stream.
+     * they were committed to the stream.
      *
      * @param ordered if event objects arrive in chronological order to
      *        {@code #onEvent(Consumer)}
      */
-    public void setOrdered(boolean ordered);
+    void setOrdered(boolean ordered);
 
     /**
-     * Specifies start time of the event stream.
+     * Specifies the start time of the stream.
      * <p>
-     * The start time must be set before the stream is started.
+     * The start time must be set before starting the stream
      *
      * @param startTime the start time, not {@code null}
      *
-     * @throws IllegalStateException if the stream has already been started
+     * @throws IllegalStateException if the stream is already started
+     *
+     * @see #start()
+     * @see #startAsync()
      */
-    public void setStartTime(Instant startTime);
+    void setStartTime(Instant startTime);
 
     /**
-     * Specifies end time of the event stream.
+     * Specifies the end time of the stream.
      * <p>
-     * The end time must be set before the stream is started.
+     * The end time must be set before starting the stream.
      * <p>
-     * When the end time is reached the stream is closed.
+     * At end time, the stream is closed.
      *
      * @param endTime the end time, not {@code null}
      *
-     * @throws IllegalStateException if the stream has already been started
+     * @throws IllegalStateException if the stream is already started
+     *
+     * @see #start()
+     * @see #startAsync()
      */
-    public void setEndTime(Instant endTime);
+    void setEndTime(Instant endTime);
 
     /**
-     * Start processing events in the stream.
+     * Start processing of actions.
      * <p>
-     * All actions performed on this stream will happen in the current thread.
+     * Actions are performed in the current thread.
      *
-     * @throws IllegalStateException if the stream is already started or if it
-     *         has been closed
+     * @throws IllegalStateException if the stream is already started or closed
      */
     void start();
 
     /**
-     * Start processing events in the stream asynchronously.
+     * Start asynchronous processing of actions.
      * <p>
-     * All actions on this stream will be performed in a separate thread.
+     * Actions are performed in a single separate thread.
      *
-     * @throws IllegalStateException if the stream is already started, or if it
-     *         has been closed
+     * @throws IllegalStateException if the stream is already started or closed
      */
     void startAsync();
 
     /**
-     * Blocks the current thread until the stream is finished, closed, or it
-     * times out.
+     * Blocks until all actions are completed, or the stream is closed, or the
+     * timeout occurs, or the current thread is interrupted, whichever happens
+     * first.
      *
      * @param timeout the maximum time to wait, not {@code null}
      *
      * @throws IllegalArgumentException if timeout is negative
-     * @throws InterruptedException
+     * @throws InterruptedException if interrupted while waiting
      *
      * @see #start()
      * @see #startAsync()
+     * @see Thread#interrupt()
      */
     void awaitTermination(Duration timeout) throws InterruptedException;
 
     /**
-     * Blocks the current thread until the stream is finished or closed.
+     * Blocks until all actions are completed, or the stream is closed, or the
+     * current thread is interrupted, whichever happens first.
      *
-     * @throws InterruptedException
+     * @throws InterruptedException if interrupted while waiting
      *
      * @see #start()
      * @see #startAsync()
+     * @see Thread#interrupt()
      */
     void awaitTermination() throws InterruptedException;
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java	Fri Sep 13 18:46:07 2019 +0200
@@ -238,7 +238,7 @@
     private void findNext() throws IOException {
         while (nextEvent == null) {
             if (chunkParser == null) {
-                chunkParser = new ChunkParser(input, false);
+                chunkParser = new ChunkParser(input);
             } else if (!chunkParser.isLastChunk()) {
                 chunkParser = chunkParser.nextChunkParser();
             } else {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Fri Sep 13 18:46:07 2019 +0200
@@ -47,7 +47,7 @@
  * A recording stream produces events from the current JVM (Java Virtual
  * Machine).
  * <p>
- * The following example, shows how to record events using the default
+ * The following example shows how to record events using the default
  * configuration and print the Garbage Collection, CPU Load and JVM Information
  * event to standard out.
  *
@@ -110,7 +110,7 @@
      * </code>
      * </pre>
      *
-     * @param configuration configuration that contains the settings to be use,
+     * @param configuration configuration that contains the settings to use,
      *        not {@code null}
      *
      * @throws IllegalStateException if Flight Recorder can't be created (for
@@ -146,16 +146,17 @@
     }
 
     /**
-     * Replaces all settings for this recording stream
+     * Replaces all settings for this recording stream.
      * <p>
-     * The following example records 20 second using the "default" configuration
-     * and then changes to settings for the "profile" configuration.
+     * The following example records 20 seconds using the "default" configuration
+     * and then changes settings to the "profile" configuration.
      *
      * <pre>
      * <code>
-     *     var defaultConfiguration = Configuration.getConfiguration("default");
-     *     var profileConfiguration = Configuration.getConfiguration("profile");
-     *     try (var rs = new RecordingStream(defaultConfiguration) {
+     *     Configuration defaultConfiguration = Configuration.getConfiguration("default");
+     *     Configuration profileConfiguration = Configuration.getConfiguration("profile");
+     *     try (RecordingStream rs = new RecordingStream(defaultConfiguration) {
+     *        rs.onEvent(System.out::println);
      *        rs.startAsync();
      *        Thread.sleep(20_000);
      *        rs.setSettings(profileConfiguration.getSettings());
@@ -165,6 +166,8 @@
      * </pre>
      *
      * @param settings the settings to set, not {@code null}
+     *
+     * @see Recording#setSettings(Map)
      */
     public void setSettings(Map<String, String> settings) {
         recording.setSettings(settings);
@@ -217,8 +220,7 @@
     }
 
     /**
-     * Determines how far back data is kept for the stream, if the stream can't
-     * keep up.
+     * Determines how far back data is kept for the stream.
      * <p>
      * To control the amount of recording data stored on disk, the maximum
      * length of time to retain the data can be specified. Data stored on disk
@@ -241,8 +243,7 @@
     }
 
     /**
-     * Determines how much data is kept in the disk repository if the stream
-     * can't keep up.
+     * Determines how much data is kept for the stream.
      * <p>
      * To control the amount of recording data that is stored on disk, the
      * maximum amount of data to retain can be specified. When the maximum limit
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java	Fri Sep 13 18:46:07 2019 +0200
@@ -2,267 +2,101 @@
 
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.function.Consumer;
 
-import jdk.jfr.EventType;
-import jdk.jfr.consumer.AbstractEventStream.EventDispatcher;
-import jdk.jfr.internal.LongMap;
+import jdk.jfr.consumer.Dispatcher.EventDispatcher;
 import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
 
 final class StreamConfiguration {
-    private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
-    Consumer<?>[] errorActions = new Consumer<?>[0];
-    private Runnable[] flushActions = NO_ACTIONS;
-    private Runnable[] closeActions = NO_ACTIONS;
-    private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
-    private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
-    LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+    final List<Runnable> closeActions = new ArrayList<>();
+    final List<Runnable> flushActions = new ArrayList<>();
+    final List<EventDispatcher> eventActions = new ArrayList<>();
+    final List<Consumer<Throwable>> errorActions = new ArrayList<>();
 
-    private boolean changedConfiguration = false;
-    private boolean closed = false;
-    private boolean reuse = true;
-    private boolean ordered = true;
-    private Instant startTime = null;
-    private Instant endTime = null;
-    private boolean started = false;
-    private long startNanos = 0;
-    private long endNanos = Long.MAX_VALUE;
+    boolean reuse = true;
+    boolean ordered = true;
+    Instant startTime = null;
+    Instant endTime = null;
+    boolean started = false;
+    long startNanos = 0;
+    long endNanos = Long.MAX_VALUE;
 
-    // Cache the last event type and dispatch.
-    EventType cacheEventType;
-    EventDispatcher[] cacheDispatchers;
-
+    volatile boolean changed = true;
 
-    public StreamConfiguration(StreamConfiguration configuration) {
-        this.flushActions = configuration.flushActions;
-        this.closeActions = configuration.closeActions;
-        this.errorActions = configuration.errorActions;
-        this.dispatchers = configuration.dispatchers;
-        this.eventFilter = configuration.eventFilter;
-        this.closed = configuration.closed;
-        this.reuse = configuration.reuse;
-        this.ordered = configuration.ordered;
-        this.startTime = configuration.startTime;
-        this.endTime = configuration.endTime;
-        this.started = configuration.started;
-        this.startNanos = configuration.startNanos;
-        this.endNanos = configuration.endNanos;
-        this.dispatcherLookup = configuration.dispatcherLookup;
+    public synchronized boolean remove(Object action) {
+        boolean removed = false;
+        removed |= flushActions.removeIf(e -> e == action);
+        removed |= closeActions.removeIf(e -> e == action);
+        removed |= errorActions.removeIf(e -> e == action);
+        removed |= eventActions.removeIf(e -> e.action == action);
+        if (removed) {
+            changed = true;
+        }
+        return removed;
     }
 
-    public StreamConfiguration() {
-    }
-
-    public StreamConfiguration remove(Object action) {
-        flushActions = remove(flushActions, action);
-        closeActions = remove(closeActions, action);
-        errorActions = remove(errorActions, action);
-        dispatchers = removeDispatch(dispatchers, action);
-        return this;
-    }
-
-    public StreamConfiguration addDispatcher(EventDispatcher e) {
-        dispatchers = add(dispatchers, e);
-        eventFilter = buildFilter(dispatchers);
-        dispatcherLookup = new LongMap<>();
-        return this;
-    }
-
-    public StreamConfiguration addFlushAction(Runnable action) {
-        flushActions = add(flushActions, action);
-        return this;
+    public synchronized void addEventAction(String name, Consumer<RecordedEvent> consumer) {
+        eventActions.add(new EventDispatcher(name, consumer));
+        changed = true;
     }
 
-    public StreamConfiguration addCloseAction(Runnable action) {
-        closeActions = add(closeActions, action);
-        return this;
-    }
-
-    public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
-        errorActions = add(errorActions, action);
-        return this;
-    }
-
-    public StreamConfiguration setClosed(boolean closed) {
-        this.closed = closed;
-        changedConfiguration = true;
-        return this;
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-
-    public Runnable[] getCloseActions() {
-        return closeActions;
+    public void addEventAction(Consumer<RecordedEvent> action) {
+        addEventAction(null, action);
     }
 
-    public Runnable[] getFlushActions() {
-        return flushActions;
+    public synchronized void addFlushAction(Runnable action) {
+        flushActions.add(action);
+        changed = true;
     }
 
-    private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
-        List<EventDispatcher> list = new ArrayList<>(array.length);
-        boolean modified = false;
-        for (int i = 0; i < array.length; i++) {
-            if (array[i].action != action) {
-                list.add(array[i]);
-            } else {
-                modified = true;
-            }
-        }
-        EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
-        if (modified) {
-            eventFilter = buildFilter(result);
-            dispatcherLookup = new LongMap<>();
-            changedConfiguration = true;
-        }
-        return result;
+    public synchronized void addCloseAction(Runnable action) {
+        closeActions.add(action);
+        changed = true;
     }
 
-    private <T> T[] remove(T[] array, Object action) {
-        List<T> list = new ArrayList<>(array.length);
-        for (int i = 0; i < array.length; i++) {
-            if (array[i] != action) {
-                list.add(array[i]);
-            } else {
-                changedConfiguration = true;
-            }
-        }
-        return list.toArray(array);
+    public synchronized void addErrorAction(Consumer<Throwable> action) {
+        errorActions.add(action);
+        changed = true;
     }
 
-    private <T> T[] add(T[] array, T object) {
-        List<T> list = new ArrayList<>(Arrays.asList(array));
-        list.add(object);
-        changedConfiguration = true;
-        return list.toArray(array);
+    public synchronized void setReuse(boolean reuse) {
+        this.reuse = reuse;
+        changed = true;
     }
 
-    private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
-        InternalEventFilter ef = new InternalEventFilter();
-        for (EventDispatcher ed : dispatchers) {
-            String name = ed.eventName;
-            if (name == null) {
-                return InternalEventFilter.ACCEPT_ALL;
-            }
-            ef.setThreshold(name, 0);
-        }
-        return ef;
+    public synchronized void setOrdered(boolean ordered) {
+        this.ordered = ordered;
+        changed = true;
     }
 
-    public StreamConfiguration setReuse(boolean reuse) {
-        this.reuse = reuse;
-        changedConfiguration = true;
-        return this;
-    }
-
-    public StreamConfiguration setOrdered(boolean ordered) {
-        this.ordered = ordered;
-        changedConfiguration = true;
-        return this;
-    }
-
-    public StreamConfiguration setEndTime(Instant endTime) {
+    public synchronized void setEndTime(Instant endTime) {
         this.endTime = endTime;
         this.endNanos = Utils.timeToNanos(endTime);
-        changedConfiguration = true;
-        return this;
+        changed = true;
     }
 
-    public StreamConfiguration setStartTime(Instant startTime) {
+    public synchronized void setStartTime(Instant startTime) {
         this.startTime = startTime;
         this.startNanos = Utils.timeToNanos(startTime);
-        changedConfiguration = true;
-        return this;
-    }
-
-    public Instant getStartTime() {
-        return startTime;
+        changed = true;
     }
 
-    public Object getEndTime() {
-        return endTime;
-    }
-
-    public boolean isStarted() {
-        return started;
+    public synchronized void setStartNanos(long startNanos) {
+        this.startNanos = startNanos;
+        changed = true;
     }
 
-    public StreamConfiguration setStartNanos(long startNanos) {
-        this.startNanos = startNanos;
-        changedConfiguration = true;
-        return this;
-    }
-
-    public void setStarted(boolean started) {
+    public synchronized void setStarted(boolean started) {
         this.started = started;
-        changedConfiguration = true;
+        changed = true;
     }
 
     public boolean hasChanged() {
-        return changedConfiguration;
-    }
-
-    public boolean getReuse() {
-        return reuse;
-    }
-
-    public boolean getOrdered() {
-        return ordered;
-    }
-
-    public InternalEventFilter getFiler() {
-        return eventFilter;
-    }
-
-    public long getStartNanos() {
-        return startNanos;
-    }
-
-    public long getEndNanos() {
-        return endNanos;
-    }
-
-    public InternalEventFilter getFilter() {
-        return eventFilter;
+        return changed;
     }
 
-    public void clearDispatchCache() {
-        cacheDispatchers = null;
-        cacheEventType = null;
-    }
-
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        for (Runnable flush : flushActions) {
-            sb.append("Flush Action: ").append(flush).append("\n");
-        }
-        for (Runnable close : closeActions) {
-            sb.append("Close Action: " + close + "\n");
-        }
-        for (Consumer<?> error : errorActions) {
-            sb.append("Error Action: " + error + "\n");
-        }
-        for (EventDispatcher dispatcher : dispatchers) {
-            sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
-        }
-        sb.append("Closed: ").append(closed).append("\n");
-        sb.append("Reuse: ").append(reuse).append("\n");
-        sb.append("Ordered: ").append(ordered).append("\n");
-        sb.append("Started: ").append(started).append("\n");
-        sb.append("Start Time: ").append(startTime).append("\n");
-        sb.append("Start Nanos: ").append(startNanos).append("\n");
-        sb.append("End Time: ").append(endTime).append("\n");
-        sb.append("End Nanos: ").append(endNanos).append("\n");
-        return sb.toString();
-    }
-
-    EventDispatcher[] getDispatchers() {
-        return dispatchers;
+    public synchronized void clearChanged() {
+        changed = false;
     }
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/EventInstrumentation.java	Fri Sep 13 18:46:07 2019 +0200
@@ -121,7 +121,7 @@
     private final boolean untypedEventHandler;
     private boolean guardHandlerReference;
     private Class<?> superClass;
-    private final static boolean streamingCommit = true; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming");
+    private final static boolean streamingCommit = false; //!SecuritySupport.getBooleanProperty("jfr.instrument.streaming");
 
     EventInstrumentation(Class<?> superClass, byte[] bytes, long id) {
         this.superClass = superClass;
--- a/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/Utils.java	Fri Sep 13 18:46:07 2019 +0200
@@ -601,6 +601,7 @@
         try {
             Thread.sleep(millis);
         } catch (InterruptedException e) {
+            // ok
         }
     }
 
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Thu Sep 12 20:46:55 2019 -0700
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestClose.java	Fri Sep 13 18:46:07 2019 +0200
@@ -78,7 +78,7 @@
         RecordingStream r = new RecordingStream();
         AtomicLong count = new AtomicLong();
         r.onEvent(e -> {
-            if (count.incrementAndGet() == 100) {
+            if (count.incrementAndGet() > 100) {
                 streaming.countDown();
             }
         });