Improved handling of Thread.interrupt() + cleanup JEP-349-branch
authoregahlin
Thu, 05 Sep 2019 16:46:50 +0200
branchJEP-349-branch
changeset 58020 f082177c5023
parent 57997 8dea18a54031
child 58049 10ecdb5d3574
Improved handling of Thread.interrupt() + cleanup
src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/ObjectContext.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java
src/jdk.jfr/share/classes/jdk/jfr/consumer/StringParser.java
test/jdk/jdk/jfr/api/consumer/recordingstream/TestAwaitTermination.java
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/AbstractEventStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -33,10 +33,10 @@
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import jdk.jfr.EventType;
@@ -44,10 +44,7 @@
 import jdk.jfr.internal.LogLevel;
 import jdk.jfr.internal.LogTag;
 import jdk.jfr.internal.Logger;
-import jdk.jfr.internal.LongMap;
 import jdk.jfr.internal.SecuritySupport;
-import jdk.jfr.internal.Utils;
-import jdk.jfr.internal.consumer.InternalEventFilter;
 
 /*
  * Purpose of this class is to simplify the implementation of
@@ -62,254 +59,10 @@
  */
 abstract class AbstractEventStream implements EventStream {
 
-    protected static final class StreamConfiguration {
-        private static final Runnable[] NO_ACTIONS = new Runnable[0];
-
-        private Consumer<?>[] errorActions = new Consumer<?>[0];
-        private Runnable[] flushActions = NO_ACTIONS;
-        private Runnable[] closeActions = NO_ACTIONS;
-        private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
-        private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
-        private LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
-
-        private boolean changedConfiguration = false;
-        private boolean closed = false;
-        private boolean reuse = true;
-        private boolean ordered = true;
-        private Instant startTime = null;
-        private Instant endTime = null;
-        private boolean started = false;
-        private long startNanos = 0;
-        private long endNanos = Long.MAX_VALUE;
-
-        public StreamConfiguration(StreamConfiguration configuration) {
-            this.flushActions = configuration.flushActions;
-            this.closeActions = configuration.closeActions;
-            this.errorActions = configuration.errorActions;
-            this.dispatchers = configuration.dispatchers;
-            this.eventFilter = configuration.eventFilter;
-            this.closed = configuration.closed;
-            this.reuse = configuration.reuse;
-            this.ordered = configuration.ordered;
-            this.startTime = configuration.startTime;
-            this.endTime = configuration.endTime;
-            this.started = configuration.started;
-            this.startNanos = configuration.startNanos;
-            this.endNanos = configuration.endNanos;
-            this.dispatcherLookup = configuration.dispatcherLookup;
-        }
-
-        public StreamConfiguration() {
-        }
-
-        public StreamConfiguration remove(Object action) {
-            flushActions = remove(flushActions, action);
-            closeActions = remove(closeActions, action);
-            dispatchers = removeDispatch(dispatchers, action);
-            return this;
-        }
-
-        public StreamConfiguration addDispatcher(EventDispatcher e) {
-            dispatchers = add(dispatchers, e);
-            eventFilter = buildFilter(dispatchers);
-            dispatcherLookup = new LongMap<>();
-            return this;
-        }
-
-        public StreamConfiguration addFlushAction(Runnable action) {
-            flushActions = add(flushActions, action);
-            return this;
-        }
-
-        public StreamConfiguration addCloseAction(Runnable action) {
-            closeActions = add(closeActions, action);
-            return this;
-        }
-
-        public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
-            errorActions = add(errorActions, action);
-            return this;
-        }
-
-        public StreamConfiguration setClosed(boolean closed) {
-            this.closed = closed;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public boolean isClosed() {
-            return closed;
-        }
-
-        public Runnable[] getCloseActions() {
-            return closeActions;
-        }
-
-        public Runnable[] getFlushActions() {
-            return flushActions;
-        }
-
-        private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
-            List<EventDispatcher> list = new ArrayList<>(array.length);
-            boolean modified = false;
-            for (int i = 0; i < array.length; i++) {
-                if (array[i].action != action) {
-                    list.add(array[i]);
-                } else {
-                    modified = true;
-                }
-            }
-            EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
-            if (modified) {
-                eventFilter = buildFilter(result);
-                dispatcherLookup = new LongMap<>();
-                changedConfiguration = true;
-            }
-            return result;
-        }
-
-        private <T> T[] remove(T[] array, Object action) {
-            List<T> list = new ArrayList<>(array.length);
-            for (int i = 0; i < array.length; i++) {
-                if (array[i] != action) {
-                    list.add(array[i]);
-                } else {
-                    changedConfiguration = true;
-                }
-            }
-            return list.toArray(array);
-        }
-
-        private <T> T[] add(T[] array, T object) {
-            List<T> list = new ArrayList<>(Arrays.asList(array));
-            list.add(object);
-            changedConfiguration = true;
-            return list.toArray(array);
-        }
-
-        private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
-            InternalEventFilter ef = new InternalEventFilter();
-            for (EventDispatcher ed : dispatchers) {
-                String name = ed.eventName;
-                if (name == null) {
-                    return InternalEventFilter.ACCEPT_ALL;
-                }
-                ef.setThreshold(name, 0);
-            }
-            return ef;
-        }
-
-        public StreamConfiguration setReuse(boolean reuse) {
-            this.reuse = reuse;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setOrdered(boolean ordered) {
-            this.ordered = ordered;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setEndTime(Instant endTime) {
-            this.endTime = endTime;
-            this.endNanos = Utils.timeToNanos(endTime);
-            changedConfiguration = true;
-            return this;
-        }
-
-        public StreamConfiguration setStartTime(Instant startTime) {
-            this.startTime = startTime;
-            this.startNanos = Utils.timeToNanos(startTime);
-            changedConfiguration = true;
-            return this;
-        }
-
-        public Instant getStartTime() {
-            return startTime;
-        }
-
-        public Object getEndTime() {
-            return endTime;
-        }
-
-        public boolean isStarted() {
-            return started;
-        }
-
-        public StreamConfiguration setStartNanos(long startNanos) {
-            this.startNanos = startNanos;
-            changedConfiguration = true;
-            return this;
-        }
-
-        public void setStarted(boolean started) {
-            this.started = started;
-            changedConfiguration = true;
-        }
-
-        public boolean hasChanged() {
-            return changedConfiguration;
-        }
-
-        public boolean getReuse() {
-            return reuse;
-        }
-
-        public boolean getOrdered() {
-            return ordered;
-        }
-
-        public InternalEventFilter getFiler() {
-            return eventFilter;
-        }
-
-        public long getStartNanos() {
-            return startNanos;
-        }
-
-        public long getEndNanos() {
-            return endNanos;
-        }
-
-        public InternalEventFilter getFilter() {
-            return eventFilter;
-        }
-
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            for (Runnable flush : flushActions) {
-                sb.append("Flush Action: ").append(flush).append("\n");
-            }
-            for (Runnable close : closeActions) {
-                sb.append("Close Action: " + close + "\n");
-            }
-            for (Consumer<?> error : errorActions) {
-                sb.append("Error Action: " + error + "\n");
-            }
-            for (EventDispatcher dispatcher : dispatchers) {
-                sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
-            }
-            sb.append("Closed: ").append(closed).append("\n");
-            sb.append("Reuse: ").append(reuse).append("\n");
-            sb.append("Ordered: ").append(ordered).append("\n");
-            sb.append("Started: ").append(started).append("\n");
-            sb.append("Start Time: ").append(startTime).append("\n");
-            sb.append("Start Nanos: ").append(startNanos).append("\n");
-            sb.append("End Time: ").append(endTime).append("\n");
-            sb.append("End Nanos: ").append(endNanos).append("\n");
-            return sb.toString();
-        }
-
-        private EventDispatcher[] getDispatchers() {
-            return dispatchers;
-        }
-    }
-
-    private final static class EventDispatcher {
+    final static class EventDispatcher {
         final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
-        final private String eventName;
-        final private Consumer<RecordedEvent> action;
+        final String eventName;
+        final Consumer<RecordedEvent> action;
 
         public EventDispatcher(Consumer<RecordedEvent> action) {
             this(null, action);
@@ -330,8 +83,9 @@
     }
 
     final static Comparator<? super RecordedEvent> END_TIME = (e1, e2) -> Long.compare(e1.endTimeTicks, e2.endTimeTicks);
-
-    private final Thread thread;
+    private final static AtomicLong counter = new AtomicLong(1);
+    private volatile Thread thread;
+    private final Object terminated = new Object();
     private final boolean active;
     private final Runnable flushOperation = () -> runFlushActions();
     private final AccessControlContext accessControllerContext;
@@ -340,14 +94,9 @@
     // Modified by updateConfiguration()
     protected volatile StreamConfiguration configuration = new StreamConfiguration();
 
-    // Cache the last event type and dispatch.
-    private EventType lastEventType;
-    private EventDispatcher[] lastEventDispatch;
-
     public AbstractEventStream(AccessControlContext acc, boolean active) throws IOException {
         this.accessControllerContext = Objects.requireNonNull(acc);
         this.active = active;
-        this.thread = SecuritySupport.createThreadWitNoPermissions("JFR Event Streaming", () -> run(acc));
     }
 
     @Override
@@ -360,7 +109,7 @@
     abstract public void close();
 
     // Purpose of synchronizing the following methods is
-    // to serialize changes to the configuration, so only one
+    // to serialize changes to the configuration so only one
     // thread at a time can change the configuration.
     //
     // The purpose is not to guard the configuration field. A new
@@ -455,47 +204,67 @@
     }
 
     @Override
-    public final void awaitTermination() {
+    public final void awaitTermination() throws InterruptedException {
         awaitTermination(Duration.ofMillis(0));
     }
 
     @Override
-    public final void awaitTermination(Duration timeout) {
+    public final void awaitTermination(Duration timeout) throws InterruptedException {
         Objects.requireNonNull(timeout);
-        if (thread != Thread.currentThread()) {
-            try {
-                thread.join(timeout.toMillis());
-            } catch (InterruptedException e) {
-                // ignore
+        if (timeout.isNegative()) {
+            throw new IllegalArgumentException("timeout value is negative");
+        }
+
+        long base = System.currentTimeMillis();
+        long now = 0;
+
+        long millis;
+        try {
+            millis = Math.multiplyExact(timeout.getSeconds(), 1000);
+        } catch (ArithmeticException a) {
+            millis = Long.MAX_VALUE;
+        }
+        int nanos = timeout.toNanosPart();
+        if (nanos == 0 && millis == 0) {
+            synchronized (terminated) {
+                while (!isClosed()) {
+                    terminated.wait(0);
+                }
+            }
+        } else {
+            while (!isClosed()) {
+                long delay = millis - now;
+                if (delay <= 0) {
+                    break;
+                }
+                synchronized (terminated) {
+                    terminated.wait(delay, nanos);
+                }
+                now = System.currentTimeMillis() - base;
             }
         }
     }
 
     protected abstract void process() throws Exception;
 
-    protected final void clearLastDispatch() {
-        lastEventDispatch = null;
-        lastEventType = null;
-    }
-
-    protected final void dispatch(RecordedEvent event) {
+    protected final void dispatch(StreamConfiguration c, RecordedEvent event) {
         EventType type = event.getEventType();
         EventDispatcher[] dispatchers = null;
-        if (type == lastEventType) {
-            dispatchers = lastEventDispatch;
+        if (type == c.cacheEventType) {
+            dispatchers = c.cacheDispatchers;
         } else {
-            dispatchers = configuration.dispatcherLookup.get(type.getId());
+            dispatchers = c.dispatcherLookup.get(type.getId());
             if (dispatchers == null) {
                 List<EventDispatcher> list = new ArrayList<>();
-                for (EventDispatcher e : configuration.getDispatchers()) {
+                for (EventDispatcher e : c.getDispatchers()) {
                     if (e.accepts(type)) {
                         list.add(e);
                     }
                 }
                 dispatchers = list.isEmpty() ? EventDispatcher.NO_DISPATCHERS : list.toArray(new EventDispatcher[0]);
-                configuration.dispatcherLookup.put(type.getId(), dispatchers);
+                c.dispatcherLookup.put(type.getId(), dispatchers);
             }
-            lastEventDispatch = dispatchers;
+            c.cacheDispatchers = dispatchers;
         }
         for (int i = 0; i < dispatchers.length; i++) {
             try {
@@ -529,11 +298,14 @@
 
     protected final void startAsync(long startNanos) {
         startInternal(startNanos);
+        Runnable r = () -> run(accessControllerContext);
+        thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r);
         thread.start();
     }
 
     protected final void start(long startNanos) {
         startInternal(startNanos);
+        thread = Thread.currentThread();
         run(accessControllerContext);
     }
 
@@ -580,6 +352,13 @@
             defaultErrorHandler(e);
         } finally {
             Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
+            try {
+                close();
+            } finally {
+                synchronized (terminated) {
+                    terminated.notifyAll();
+                }
+            }
         }
     }
 
@@ -591,15 +370,11 @@
         }
         for (int i = 0; i < consumers.length; i++) {
             @SuppressWarnings("unchecked")
-            Consumer<Throwable> c = (Consumer<Throwable>) consumers[i];
-            c.accept(e);
+            Consumer<Throwable> conusmer = (Consumer<Throwable>) consumers[i];
+            conusmer.accept(e);
         }
     }
 
-    private void defaultErrorHandler(Throwable e) {
-        e.printStackTrace();
-    }
-
     private void runFlushActions() {
         Runnable[] flushActions = configuration.getFlushActions();
         for (int i = 0; i < flushActions.length; i++) {
@@ -611,13 +386,22 @@
         }
     }
 
-    private void run(AccessControlContext acc) {
+    private void run(AccessControlContext accessControlContext) {
         AccessController.doPrivileged(new PrivilegedAction<Void>() {
             @Override
             public Void run() {
                 execute();
                 return null;
             }
-        }, acc);
+        }, accessControlContext);
+    }
+
+    private String nextThreadName() {
+        counter.incrementAndGet();
+        return "JFR Event Stream " + counter;
+    }
+
+    private void defaultErrorHandler(Throwable e) {
+        e.printStackTrace();
     }
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ChunkParser.java	Thu Sep 05 16:46:50 2019 +0200
@@ -54,7 +54,7 @@
     static final byte CHECKPOINT_CHUNK_HEADER_MASK = 2;
     // Checkpoint contains only statics that will not change from chunk to chunk
     static final byte CHECKPOINT_STATICS_MASK = 4;
-    // Checkpoint contains thread realted information
+    // Checkpoint contains thread related information
     static final byte CHECKPOINT_THREADS_MASK = 8;
 
     private static final long CONSTANT_POOL_TYPE_ID = 1;
@@ -209,7 +209,7 @@
 
     private void parseCheckpoint() throws IOException {
         // Content has been parsed previously. This
-        // is for triggering flsuh
+        // is to trigger flush
         input.readLong(); // timestamp
         input.readLong(); // duration
         input.readLong(); // delta
@@ -449,6 +449,4 @@
     public long getStartNanos() {
         return chunkHeader.getStartNanos();
     }
-
-
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ConstantLookup.java	Thu Sep 05 16:46:50 2019 +0200
@@ -3,7 +3,6 @@
 import jdk.jfr.internal.Type;
 
 final class ConstantLookup {
-
     private final Type type;
     private ConstantMap current;
     private ConstantMap previous = ConstantMap.EMPTY;
@@ -24,7 +23,6 @@
     public void newPool() {
         previous = current;
         current = new ConstantMap(current.factory, current.name);
-     //   previous =  new ConstantMap(); // disable cache
     }
 
     public Object getPreviousResolved(long key) {
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -50,7 +50,7 @@
     private long chunkStartNanos;
     private RecordedEvent[] sortedList;
 
-    public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
+    EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, boolean active) throws IOException {
         super(acc, active);
         this.fileAccess = Objects.requireNonNull(fileAccess);
         this.active = active;
@@ -75,7 +75,7 @@
     }
 
     @Override
-    public void process() throws Exception {
+    protected void process() throws Exception {
         StreamConfiguration c = configuration;
         Path path;
         boolean validStartTime = active || c.getStartTime() != null;
@@ -106,11 +106,11 @@
                     chunkParser.resetEventCache();
                     chunkParser.setParserFilter(c.getFilter());
                     chunkParser.updateEventParsers();
-                    clearLastDispatch();
+                    c.clearDispatchCache();
                     if (ordered) {
-                        awaitnewEvent = processOrdered(awaitnewEvent);
+                        awaitnewEvent = processOrdered(c, awaitnewEvent);
                     } else {
-                        awaitnewEvent = processUnordered(awaitnewEvent);
+                        awaitnewEvent = processUnordered(c, awaitnewEvent);
                     }
                     if (chunkParser.getStartNanos() + chunkParser.getChunkDuration() > filterEnd) {
                         close();
@@ -135,7 +135,7 @@
         }
     }
 
-    private boolean processOrdered(boolean awaitNewEvents) throws IOException {
+    private boolean processOrdered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
         if (sortedList == null) {
             sortedList = new RecordedEvent[100_000];
         }
@@ -164,18 +164,18 @@
             Arrays.sort(sortedList, 0, index, END_TIME);
         }
         for (int i = 0; i < index; i++) {
-            dispatch(sortedList[i]);
+            dispatch(c, sortedList[i]);
         }
         return awaitNewEvents;
     }
 
-    private boolean processUnordered(boolean awaitNewEvents) throws IOException {
+    private boolean processUnordered(StreamConfiguration c, boolean awaitNewEvents) throws IOException {
         while (true) {
             RecordedEvent e = chunkParser.readStreamingEvent(awaitNewEvents);
             if (e == null) {
                 return true;
             } else {
-                dispatch(e);
+                dispatch(c, e);
             }
         }
     }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -43,7 +43,7 @@
     private ChunkParser chunkParser;
     private RecordedEvent[] sortedList;
 
-    public EventFileStream(AccessControlContext acc, Path path) throws IOException {
+    EventFileStream(AccessControlContext acc, Path path) throws IOException {
         super(acc, false);
         Objects.requireNonNull(path);
         this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILIGED);
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public void process() throws IOException {
+    protected void process() throws IOException {
         StreamConfiguration c = configuration;
         long start = 0;
         long end = Long.MAX_VALUE;
@@ -98,11 +98,11 @@
             chunkParser.resetEventCache();
             chunkParser.setParserFilter(c.getFiler());
             chunkParser.updateEventParsers();
-            clearLastDispatch();
+            c.clearDispatchCache();
             if (ordered) {
-                processOrdered();
+                processOrdered(c);
             } else {
-                processUnordered();
+                processUnordered(c);
             }
             if (chunkParser.isLastChunk()) {
                 return;
@@ -111,7 +111,7 @@
         }
     }
 
-    private void processOrdered() throws IOException {
+    private void processOrdered(StreamConfiguration c) throws IOException {
         if (sortedList == null) {
             sortedList = new RecordedEvent[10_000];
         }
@@ -122,7 +122,7 @@
             if (event == null) {
                 Arrays.sort(sortedList, 0, index, END_TIME);
                 for (int i = 0; i < index; i++) {
-                    dispatch(sortedList[i]);
+                    dispatch(c, sortedList[i]);
                 }
                 return;
             }
@@ -135,13 +135,13 @@
         }
     }
 
-    private void processUnordered() throws IOException {
+    private void processUnordered(StreamConfiguration c) throws IOException {
         while (!isClosed()) {
             RecordedEvent event = chunkParser.readEvent();
             if (event == null) {
                 return;
             }
-            dispatch(event);
+            dispatch(c, event);
         }
     }
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventParser.java	Thu Sep 05 16:46:50 2019 +0200
@@ -50,9 +50,10 @@
     private final RecordedEvent unorderedEvent;
     private final ObjectContext objectContext;
 
+    private RecordedEvent[] cached;
+    private int cacheIndex;
+
     private boolean enabled = true;
-    private RecordedEvent[] eventCache;
-    private int index;
     private boolean ordered;
     private long filterStart;
     private long filterEnd = Long.MAX_VALUE;
@@ -72,17 +73,17 @@
 
     private RecordedEvent cachedEvent() {
         if (ordered) {
-            if (index == eventCache.length) {
-                RecordedEvent[] cache = eventCache;
-                eventCache = new RecordedEvent[eventCache.length * 2];
-                System.arraycopy(cache, 0, eventCache, 0, cache.length);
+            if (cacheIndex == cached.length) {
+                RecordedEvent[] old = cached;
+                cached = new RecordedEvent[cached.length * 2];
+                System.arraycopy(old, 0, cached, 0, old.length);
             }
-            RecordedEvent event = eventCache[index];
+            RecordedEvent event = cached[cacheIndex];
             if (event == null) {
                 event = new RecordedEvent(objectContext, new Object[length], 0L, 0L);
-                eventCache[index] = event;
+                cached[cacheIndex] = event;
             }
-            index++;
+            cacheIndex++;
             return event;
         } else {
             return unorderedEvent;
@@ -131,7 +132,7 @@
             }
         }
 
-        if (eventCache != null) {
+        if (cached != null) {
             RecordedEvent event = cachedEvent();
             event.startTimeTicks = startTicks;
             event.endTimeTicks = endTicks;
@@ -155,11 +156,11 @@
     }
 
     public void resetCache() {
-        index = 0;
+        cacheIndex = 0;
     }
 
     public boolean hasReuse() {
-        return eventCache != null;
+        return cached != null;
     }
 
     public void setReuse(boolean reuse) {
@@ -167,10 +168,10 @@
             return;
         }
         if (reuse) {
-            eventCache = new RecordedEvent[2];
-            index = 0;
+            cached = new RecordedEvent[2];
+            cacheIndex = 0;
         } else {
-            eventCache = null;
+            cached = null;
         }
     }
 
@@ -187,7 +188,6 @@
             return;
         }
         this.ordered = ordered;
-        this.index = 0;
+        this.cacheIndex = 0;
     }
-
 }
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -173,10 +173,11 @@
      *
      * @return {@code true} if the action was removed, {@code false} otherwise
      *
-     * @see #onClose(Runnable)
-     * @see #onFlush(Runnable)
      * @see #onEvent(Consumer)
      * @see #onEvent(String, Consumer)
+     * @see #onFlush(Runnable)
+     * @see #onClose(Runnable)
+     * @see #onError(Consumer)
      */
     boolean remove(Object action);
 
@@ -252,16 +253,21 @@
      *
      * @param timeout the maximum time to wait, not {@code null}
      *
+     * @throws IllegalArgumentException if timeout is negative
+     * @throws InterruptedException
+     *
      * @see #start()
      * @see #startAsync()
      */
-    void awaitTermination(Duration timeout);
+    void awaitTermination(Duration timeout) throws InterruptedException;
 
     /**
      * Blocks the current thread until the stream is finished or closed.
      *
+     * @throws InterruptedException
+     *
      * @see #start()
      * @see #startAsync()
      */
-    void awaitTermination();
+    void awaitTermination() throws InterruptedException;
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/ObjectContext.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/ObjectContext.java	Thu Sep 05 16:46:50 2019 +0200
@@ -45,19 +45,19 @@
         this.timeConverter = timeConverter;
     }
 
-    private ObjectContext(ObjectContext parent, ValueDescriptor desc) {
+    private ObjectContext(ObjectContext parent, ValueDescriptor descriptor) {
         this.eventType = parent.eventType;
         this.contextLookup = parent.contextLookup;
         this.timeConverter = parent.timeConverter;
-        this.fields = desc.getFields();
+        this.fields = descriptor.getFields();
     }
 
-    public ObjectContext getInstance(ValueDescriptor desc) {
-        ObjectContext h = contextLookup.get(desc);
-        if (h == null) {
-            h = new ObjectContext(this, desc);
-            contextLookup.put(desc, h);
+    public ObjectContext getInstance(ValueDescriptor descriptor) {
+        ObjectContext context = contextLookup.get(descriptor);
+        if (context == null) {
+            context = new ObjectContext(this, descriptor);
+            contextLookup.put(descriptor, context);
         }
-        return h;
+        return context;
     }
 }
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java	Thu Sep 05 16:46:50 2019 +0200
@@ -347,12 +347,12 @@
     }
 
     @Override
-    public void awaitTermination(Duration timeout) {
+    public void awaitTermination(Duration timeout) throws InterruptedException {
         directoryStream.awaitTermination(timeout);
     }
 
     @Override
-    public void awaitTermination() {
+    public void awaitTermination() throws InterruptedException {
         directoryStream.awaitTermination();
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StreamConfiguration.java	Thu Sep 05 16:46:50 2019 +0200
@@ -0,0 +1,268 @@
+package jdk.jfr.consumer;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import jdk.jfr.EventType;
+import jdk.jfr.consumer.AbstractEventStream.EventDispatcher;
+import jdk.jfr.internal.LongMap;
+import jdk.jfr.internal.Utils;
+import jdk.jfr.internal.consumer.InternalEventFilter;
+
+final class StreamConfiguration {
+    private static final Runnable[] NO_ACTIONS = new Runnable[0];
+
+    Consumer<?>[] errorActions = new Consumer<?>[0];
+    private Runnable[] flushActions = NO_ACTIONS;
+    private Runnable[] closeActions = NO_ACTIONS;
+    private EventDispatcher[] dispatchers = EventDispatcher.NO_DISPATCHERS;
+    private InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL;
+    LongMap<EventDispatcher[]> dispatcherLookup = new LongMap<>();
+
+    private boolean changedConfiguration = false;
+    private boolean closed = false;
+    private boolean reuse = true;
+    private boolean ordered = true;
+    private Instant startTime = null;
+    private Instant endTime = null;
+    private boolean started = false;
+    private long startNanos = 0;
+    private long endNanos = Long.MAX_VALUE;
+
+    // Cache the last event type and dispatch.
+    EventType cacheEventType;
+    EventDispatcher[] cacheDispatchers;
+
+
+    public StreamConfiguration(StreamConfiguration configuration) {
+        this.flushActions = configuration.flushActions;
+        this.closeActions = configuration.closeActions;
+        this.errorActions = configuration.errorActions;
+        this.dispatchers = configuration.dispatchers;
+        this.eventFilter = configuration.eventFilter;
+        this.closed = configuration.closed;
+        this.reuse = configuration.reuse;
+        this.ordered = configuration.ordered;
+        this.startTime = configuration.startTime;
+        this.endTime = configuration.endTime;
+        this.started = configuration.started;
+        this.startNanos = configuration.startNanos;
+        this.endNanos = configuration.endNanos;
+        this.dispatcherLookup = configuration.dispatcherLookup;
+    }
+
+    public StreamConfiguration() {
+    }
+
+    public StreamConfiguration remove(Object action) {
+        flushActions = remove(flushActions, action);
+        closeActions = remove(closeActions, action);
+        errorActions = remove(errorActions, action);
+        dispatchers = removeDispatch(dispatchers, action);
+        return this;
+    }
+
+    public StreamConfiguration addDispatcher(EventDispatcher e) {
+        dispatchers = add(dispatchers, e);
+        eventFilter = buildFilter(dispatchers);
+        dispatcherLookup = new LongMap<>();
+        return this;
+    }
+
+    public StreamConfiguration addFlushAction(Runnable action) {
+        flushActions = add(flushActions, action);
+        return this;
+    }
+
+    public StreamConfiguration addCloseAction(Runnable action) {
+        closeActions = add(closeActions, action);
+        return this;
+    }
+
+    public StreamConfiguration addErrorAction(Consumer<Throwable> action) {
+        errorActions = add(errorActions, action);
+        return this;
+    }
+
+    public StreamConfiguration setClosed(boolean closed) {
+        this.closed = closed;
+        changedConfiguration = true;
+        return this;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public Runnable[] getCloseActions() {
+        return closeActions;
+    }
+
+    public Runnable[] getFlushActions() {
+        return flushActions;
+    }
+
+    private EventDispatcher[] removeDispatch(EventDispatcher[] array, Object action) {
+        List<EventDispatcher> list = new ArrayList<>(array.length);
+        boolean modified = false;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i].action != action) {
+                list.add(array[i]);
+            } else {
+                modified = true;
+            }
+        }
+        EventDispatcher[] result = list.toArray(new EventDispatcher[0]);
+        if (modified) {
+            eventFilter = buildFilter(result);
+            dispatcherLookup = new LongMap<>();
+            changedConfiguration = true;
+        }
+        return result;
+    }
+
+    private <T> T[] remove(T[] array, Object action) {
+        List<T> list = new ArrayList<>(array.length);
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != action) {
+                list.add(array[i]);
+            } else {
+                changedConfiguration = true;
+            }
+        }
+        return list.toArray(array);
+    }
+
+    private <T> T[] add(T[] array, T object) {
+        List<T> list = new ArrayList<>(Arrays.asList(array));
+        list.add(object);
+        changedConfiguration = true;
+        return list.toArray(array);
+    }
+
+    private static InternalEventFilter buildFilter(EventDispatcher[] dispatchers) {
+        InternalEventFilter ef = new InternalEventFilter();
+        for (EventDispatcher ed : dispatchers) {
+            String name = ed.eventName;
+            if (name == null) {
+                return InternalEventFilter.ACCEPT_ALL;
+            }
+            ef.setThreshold(name, 0);
+        }
+        return ef;
+    }
+
+    public StreamConfiguration setReuse(boolean reuse) {
+        this.reuse = reuse;
+        changedConfiguration = true;
+        return this;
+    }
+
+    public StreamConfiguration setOrdered(boolean ordered) {
+        this.ordered = ordered;
+        changedConfiguration = true;
+        return this;
+    }
+
+    public StreamConfiguration setEndTime(Instant endTime) {
+        this.endTime = endTime;
+        this.endNanos = Utils.timeToNanos(endTime);
+        changedConfiguration = true;
+        return this;
+    }
+
+    public StreamConfiguration setStartTime(Instant startTime) {
+        this.startTime = startTime;
+        this.startNanos = Utils.timeToNanos(startTime);
+        changedConfiguration = true;
+        return this;
+    }
+
+    public Instant getStartTime() {
+        return startTime;
+    }
+
+    public Object getEndTime() {
+        return endTime;
+    }
+
+    public boolean isStarted() {
+        return started;
+    }
+
+    public StreamConfiguration setStartNanos(long startNanos) {
+        this.startNanos = startNanos;
+        changedConfiguration = true;
+        return this;
+    }
+
+    public void setStarted(boolean started) {
+        this.started = started;
+        changedConfiguration = true;
+    }
+
+    public boolean hasChanged() {
+        return changedConfiguration;
+    }
+
+    public boolean getReuse() {
+        return reuse;
+    }
+
+    public boolean getOrdered() {
+        return ordered;
+    }
+
+    public InternalEventFilter getFiler() {
+        return eventFilter;
+    }
+
+    public long getStartNanos() {
+        return startNanos;
+    }
+
+    public long getEndNanos() {
+        return endNanos;
+    }
+
+    public InternalEventFilter getFilter() {
+        return eventFilter;
+    }
+
+    public void clearDispatchCache() {
+        cacheDispatchers = null;
+        cacheEventType = null;
+    }
+
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        for (Runnable flush : flushActions) {
+            sb.append("Flush Action: ").append(flush).append("\n");
+        }
+        for (Runnable close : closeActions) {
+            sb.append("Close Action: " + close + "\n");
+        }
+        for (Consumer<?> error : errorActions) {
+            sb.append("Error Action: " + error + "\n");
+        }
+        for (EventDispatcher dispatcher : dispatchers) {
+            sb.append("Dispatch Action: " + dispatcher.eventName + "(" + dispatcher + ") \n");
+        }
+        sb.append("Closed: ").append(closed).append("\n");
+        sb.append("Reuse: ").append(reuse).append("\n");
+        sb.append("Ordered: ").append(ordered).append("\n");
+        sb.append("Started: ").append(started).append("\n");
+        sb.append("Start Time: ").append(startTime).append("\n");
+        sb.append("Start Nanos: ").append(startNanos).append("\n");
+        sb.append("End Time: ").append(endTime).append("\n");
+        sb.append("End Nanos: ").append(endNanos).append("\n");
+        return sb.toString();
+    }
+
+    EventDispatcher[] getDispatchers() {
+        return dispatchers;
+    }
+}
\ No newline at end of file
--- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/StringParser.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/StringParser.java	Thu Sep 05 16:46:50 2019 +0200
@@ -36,7 +36,6 @@
     private final static Charset UTF8 = Charset.forName("UTF-8");
     private final static Charset LATIN1 = Charset.forName("ISO-8859-1");
 
-
     final static class CharsetParser extends Parser {
         private final Charset charset;
         private int lastSize;
--- a/test/jdk/jdk/jfr/api/consumer/recordingstream/TestAwaitTermination.java	Tue Sep 03 22:54:46 2019 +0200
+++ b/test/jdk/jdk/jfr/api/consumer/recordingstream/TestAwaitTermination.java	Thu Sep 05 16:46:50 2019 +0200
@@ -50,7 +50,11 @@
         try (RecordingStream r = new RecordingStream()) {
             r.startAsync();
             var c = CompletableFuture.runAsync(() -> {
-                r.awaitTermination();
+                try {
+                    r.awaitTermination();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
             });
             r.close();
             c.get();
@@ -61,7 +65,11 @@
         try (RecordingStream r = new RecordingStream()) {
             r.startAsync();
             var c = CompletableFuture.runAsync(() -> {
-                r.awaitTermination(Duration.ofMillis(10));
+                try {
+                    r.awaitTermination(Duration.ofMillis(10));
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
             });
             c.get();
             r.close();