# HG changeset patch # User egahlin # Date 1562187104 -7200 # Node ID 099789ceff7dff48d5b85f714ca2bd7a26f4e2bf # Parent 216bf2e3b542ad500a4b78f684fc827208a9632b Provide implementation for all added API methods diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventConsumer.java Wed Jul 03 22:51:44 2019 +0200 @@ -0,0 +1,393 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.consumer; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +import jdk.jfr.EventType; +import jdk.jfr.internal.JVM; +import jdk.jfr.internal.LogLevel; +import jdk.jfr.internal.LogTag; +import jdk.jfr.internal.Logger; +import jdk.jfr.internal.LongMap; +import jdk.jfr.internal.consumer.InternalEventFilter; + +abstract class EventConsumer implements Runnable { + + public final static Instant NEXT_EVENT = Instant.now(); + + final static class EventDispatcher { + public final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0]; + + final private String eventName; + final Consumer action; + + public EventDispatcher(Consumer action) { + this(null, action); + } + + public EventDispatcher(String eventName, Consumer action) { + this.eventName = eventName; + this.action = action; + } + + public void offer(RecordedEvent event) { + action.accept(event); + } + + public boolean accepts(EventType eventType) { + return (eventName == null || eventType.getName().equals(eventName)); + } + } + + private final static JVM jvm = JVM.getJVM(); + private final static VarHandle closedHandle; + private final static VarHandle consumersHandle; + private final static VarHandle dispatcherHandle; + private final static VarHandle flushActionsHandle; + private final static VarHandle closeActionsHandle; + private final static VarHandle orderedHandle; + private final static VarHandle reuseHandle; + private final static VarHandle startTimeHandle; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + closedHandle = l.findVarHandle(EventConsumer.class, "closed", boolean.class); + consumersHandle = l.findVarHandle(EventConsumer.class, "consumers", EventDispatcher[].class); + dispatcherHandle = l.findVarHandle(EventConsumer.class, "dispatcher", LongMap.class); + flushActionsHandle = l.findVarHandle(EventConsumer.class, "flushActions", Runnable[].class); + closeActionsHandle = l.findVarHandle(EventConsumer.class, "closeActions", Runnable[].class); + orderedHandle = l.findVarHandle(EventConsumer.class, "ordered", boolean.class); + reuseHandle = l.findVarHandle(EventConsumer.class, "reuse", boolean.class); + startTimeHandle = l.findVarHandle(EventConsumer.class, "startTime", Instant.class); + } catch (ReflectiveOperationException e) { + throw new InternalError(e); + } + } + // set by VarHandle + private boolean closed; + // set by VarHandle + private EventDispatcher[] consumers = new EventDispatcher[0]; + // set by VarHandle + private LongMap dispatcher = new LongMap<>(); + // set by VarHandle + private Runnable[] flushActions = new Runnable[0]; + // set by VarHandle + private Runnable[] closeActions = new Runnable[0]; + + private final AccessControlContext accessControlContext; + protected volatile InternalEventFilter eventFilter = InternalEventFilter.ACCEPT_ALL; + + private boolean started; + private Thread thread; + + protected long startNanos; + protected boolean ordered = true; + protected boolean reuse = true; + Instant startTime; + + public EventConsumer(AccessControlContext acc) throws IOException { + this.accessControlContext = acc; + } + + public void run() { + doPriviliged(() -> execute()); + } + + void doPriviliged(Runnable r) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + r.run(); + return null; + } + }, accessControlContext); + } + + private void execute() { + jvm.exclude(Thread.currentThread()); + try { + updateStartNanos(); + process(); + } catch (IOException e) { + if (!isClosed()) { + logException(e); + } + } catch (Exception e) { + logException(e); + } finally { + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); + } + } + + // User setting overrides default + private void updateStartNanos() { + if (startTime != null) { + try { + setStartNanos(startTime.toEpochMilli() * 1_000_000L); + } catch (ArithmeticException ae) { + setStartNanos(Long.MAX_VALUE); + } + } + } + + private void logException(Exception e) { + e.printStackTrace(); // for debugging purposes, remove before + // integration + Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Unexpected error processing stream. " + e.getMessage()); + } + + public abstract void process() throws IOException; + + public synchronized boolean remove(Object action) { + boolean remove = false; + Runnable[] updatedFlushActions = removeAction(flushActions, action); + if (updatedFlushActions != null) { + flushActionsHandle.setVolatile(this, updatedFlushActions); + remove = true; + } + Runnable[] updatedCloseActions = removeAction(closeActions, action); + if (updatedCloseActions != null) { + closeActionsHandle.setVolatile(this, updatedCloseActions); + remove = true; + } + + boolean removeConsumer = false; + List list = new ArrayList<>(); + for (int i = 0; i < consumers.length; i++) { + if (consumers[i].action != action) { + list.add(consumers[i]); + } else { + removeConsumer = true; + remove = true; + } + } + if (removeConsumer) { + EventDispatcher[] array = list.toArray(new EventDispatcher[list.size()]); + eventFilter = buildFilter(array); + consumersHandle.setVolatile(this, array); + dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset + // dispatch + } + return remove; + } + + public void dispatch(RecordedEvent e) { + if (e.endTime < startNanos) { + return; + } + + EventDispatcher[] consumerDispatch = dispatcher.get(e.getEventType().getId()); + if (consumerDispatch == null) { + consumerDispatch = EventDispatcher.NO_DISPATCHERS; + for (EventDispatcher ec : consumers.clone()) { + if (ec.accepts(e.getEventType())) { + consumerDispatch = merge(consumerDispatch, ec); + } + } + dispatcher.put(e.getEventType().getId(), consumerDispatch); + } + for (int i = 0; i < consumerDispatch.length; i++) { + try { + consumerDispatch[i].offer(e); + } catch (Exception exception) { + // Is this a reasonable behavior for an exception? + // Error will abort the stream. + } + } + + } + + public void onEvent(Consumer action) { + add(new EventDispatcher(action)); + } + + public void onEvent(String eventName, Consumer action) { + add(new EventDispatcher(eventName, action)); + } + + InternalEventFilter buildFilter(EventDispatcher[] dispatchers) { + InternalEventFilter ef = new InternalEventFilter(); + for (EventDispatcher ed : dispatchers) { + String name = ed.eventName; + if (name == null) { + return InternalEventFilter.ACCEPT_ALL; + } + ef.setThreshold(name, 0); + } + return ef.threadSafe(); + } + + private synchronized void add(EventDispatcher e) { + EventDispatcher[] dispatchers = merge(consumers, e); + eventFilter = buildFilter(dispatchers); + consumersHandle.setVolatile(this, dispatchers); + dispatcherHandle.setVolatile(this, new LongMap<>()); // will reset + } + + public synchronized void onFlush(Runnable action) { + flushActionsHandle.setVolatile(this, addAction(flushActions, action)); + } + + public synchronized void addCloseAction(Runnable action) { + closeActionsHandle.setVolatile(this, addAction(closeActions, action)); + } + + public void setClosed(boolean closed) { + closedHandle.setVolatile(this, closed); + } + + final public boolean isClosed() { + return closed; + } + + public void runCloseActions() { + + Runnable[] cas = this.closeActions; + for (int i = 0; i < cas.length; i++) { + cas[i].run(); + } + } + + public void runFlushActions() { + Runnable[] fas = this.flushActions; + for (int i = 0; i < fas.length; i++) { + fas[i].run(); + } + } + + public void startAsync(long startNanos) { + if (started) { + throw new IllegalStateException("Event stream can only be started once"); + } + started = true; + setStartNanos(startNanos); + thread = new Thread(this); + thread.setDaemon(true); + thread.start(); + } + + public void start(long startNanos) { + synchronized (this) { + if (started) { + throw new IllegalStateException("Event stream can only be started once"); + } + started = true; + setStartNanos(startNanos); + } + run(); + } + + public void awaitTermination(Duration timeout) { + Objects.requireNonNull(timeout); + Thread t = null; + synchronized (this) { + t = thread; + } + if (t != null && t != Thread.currentThread()) { + try { + t.join(timeout.toMillis()); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void awaitTermination() { + awaitTermination(Duration.ofMillis(0)); + } + + private void setStartNanos(long startNanos) { + this.startNanos = startNanos; + } + + protected static EventDispatcher[] merge(EventDispatcher[] current, EventDispatcher add) { + EventDispatcher[] array = new EventDispatcher[current.length + 1]; + System.arraycopy(current, 0, array, 0, current.length); + array[current.length] = add; + return array; + } + + private static Runnable[] removeAction(Runnable[] array, Object action) { + if (array.length == 0) { + return null; + } + boolean remove = false; + List list = new ArrayList<>(); + for (int i = 0; i < array.length; i++) { + if (array[i] != action) { + list.add(array[i]); + } else { + remove = true; + } + } + if (remove) { + return list.toArray(new Runnable[list.size()]); + } + return null; + } + + private static Runnable[] addAction(Runnable[] array, Runnable action) { + ArrayList a = new ArrayList<>(); + a.addAll(Arrays.asList(array)); + a.add(action); + return a.toArray(new Runnable[0]); + } + + abstract public void close(); + + public void setReuse(boolean reuse) { + reuseHandle.setVolatile(this, reuse); + } + + public void setOrdered(boolean ordered) { + orderedHandle.setVolatile(this, ordered); + } + + public void setStartTime(Instant startTime) { + Objects.nonNull(startTime); + if (started) { + throw new IllegalStateException("Stream is already started"); + } + if (startTime.isBefore(Instant.EPOCH)) { + startTime = Instant.EPOCH; + } + startTimeHandle.setVolatile(this, startTime); + } + +} \ No newline at end of file diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Thu Jun 27 10:41:01 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventDirectoryStream.java Wed Jul 03 22:51:44 2019 +0200 @@ -35,7 +35,6 @@ import java.util.Objects; import java.util.function.Consumer; -import jdk.jfr.internal.consumer.EventConsumer; import jdk.jfr.internal.consumer.RecordingInput; import jdk.jfr.internal.consumer.RepositoryFiles; @@ -46,23 +45,32 @@ */ final class EventDirectoryStream implements EventStream { - static final class ParserConsumer extends EventConsumer { + static final class DirectoryConsumer extends EventConsumer { private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); private static final int DEFAULT_ARRAY_SIZE = 10_000; private final RepositoryFiles repositoryFiles; private ChunkParser chunkParser; private RecordedEvent[] sortedList; + protected long chunkStartNanos; - public ParserConsumer(AccessControlContext acc, Path p) throws IOException { + public DirectoryConsumer(AccessControlContext acc, Path p) throws IOException { super(acc); repositoryFiles = new RepositoryFiles(p); } @Override public void process() throws IOException { - Path path = repositoryFiles.nextPath(startNanos); - startNanos = repositoryFiles.getTimestamp(path) + 1; + chunkStartNanos = startNanos; + Path path; + if (startTime == EventConsumer.NEXT_EVENT) { + // TODO: Need to skip forward to the next event + // For now, use the last chunk. + path = repositoryFiles.lastPath(); + } else { + path = repositoryFiles.nextPath(chunkStartNanos); + } + chunkStartNanos = repositoryFiles.getTimestamp(path) + 1; try (RecordingInput input = new RecordingInput(path.toFile())) { chunkParser = new ChunkParser(input, this.reuse); while (!isClosed()) { @@ -81,11 +89,11 @@ runFlushActions(); } - path = repositoryFiles.nextPath(startNanos); + path = repositoryFiles.nextPath(chunkStartNanos); if (path == null) { return; // stream closed } - startNanos = repositoryFiles.getTimestamp(path) + 1; + chunkStartNanos = repositoryFiles.getTimestamp(path) + 1; input.setFile(path); chunkParser = chunkParser.newChunkParser(); } @@ -147,8 +155,9 @@ private final EventConsumer eventConsumer; - public EventDirectoryStream(AccessControlContext acc, Path p) throws IOException { - eventConsumer = new ParserConsumer(acc, p); + public EventDirectoryStream(AccessControlContext acc, Path p, Instant startTime) throws IOException { + eventConsumer = new DirectoryConsumer(acc, p); + eventConsumer.startTime = startTime; } public void close() { @@ -223,4 +232,9 @@ public void setOrdered(boolean ordered) { eventConsumer.setOrdered(ordered); } + + @Override + public void setStartTime(Instant startTime) { + eventConsumer.setStartTime(startTime); + } } diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Thu Jun 27 10:41:01 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventFileStream.java Wed Jul 03 22:51:44 2019 +0200 @@ -30,12 +30,12 @@ import java.security.AccessControlContext; import java.security.AccessController; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import java.util.function.Consumer; -import jdk.jfr.internal.consumer.EventConsumer; import jdk.jfr.internal.consumer.RecordingInput; /** @@ -44,7 +44,7 @@ */ final class EventFileStream implements EventStream { - private final static class FileEventConsumer extends EventConsumer { + private final static class FileConsumer extends EventConsumer { private static final Comparator END_TIME = (e1, e2) -> Long.compare(e1.endTime, e2.endTime); private static final int DEFAULT_ARRAY_SIZE = 100_000; private final RecordingInput input; @@ -53,7 +53,7 @@ private RecordedEvent[] sortedList; private boolean ordered; - public FileEventConsumer(AccessControlContext acc, RecordingInput input) throws IOException { + public FileConsumer(AccessControlContext acc, RecordingInput input) throws IOException { super(acc); this.input = input; } @@ -129,15 +129,17 @@ public void close() { } + + } private final RecordingInput input; - private final FileEventConsumer eventConsumer; + private final FileConsumer eventConsumer; - public EventFileStream(Path path) throws IOException { + public EventFileStream(Path path, Instant from, Instant to) throws IOException { Objects.requireNonNull(path); input = new RecordingInput(path.toFile()); - eventConsumer = new FileEventConsumer(AccessController.getContext(), input); + eventConsumer = new FileConsumer(AccessController.getContext(), input); } @Override @@ -212,4 +214,9 @@ public void setOrdered(boolean ordered) { eventConsumer.setOrdered(ordered); } + + @Override + public void setStartTime(Instant startTime) { + eventConsumer.setStartTime(startTime); + } } diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Thu Jun 27 10:41:01 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java Wed Jul 03 22:51:44 2019 +0200 @@ -38,49 +38,36 @@ public interface EventStream extends AutoCloseable { /** - * Creates a stream starting from the next written event in a disk - * repository. + * Creates a stream from a disk repository. + *

+ * By default, the stream will start with the next event that is flushed by + * Flight Recorder. * * @param directory location of the disk repository, not {@code null} + * * @return an event stream, not {@code null} + * + * @throws IOException if a stream can't be opened, or an I/O error occurs + * during reading */ public static EventStream openRepository(Path directory) throws IOException { - return new EventDirectoryStream(AccessController.getContext(), directory); + return new EventDirectoryStream(AccessController.getContext(), directory, EventConsumer.NEXT_EVENT); } /** - * Creates an event stream starting from the first event in a file. + * Creates an event stream from a file. + *

+ * By default, the stream will start with the first event in the file. * * @param file location of the file, not {@code null} + * * @return an event stream, not {@code null} * - * @throws IOException if a stream can't be opened,or an I/O error occurs + * @throws IOException if a stream can't be opened, or an I/O error occurs * during reading */ public static EventStream openFile(Path file) throws IOException { - return new EventFileStream(file); - } - - /** - * Creates an event stream starting start time and end time in a file. - * - * @param file location of the file, not {@code null} - * - * @param the start start time for the stream, or {@code null} to get data - * from the beginning of the - * - * @param the end end time for the stream, or {@code null} to get data until - * the end. - * - * @throws IllegalArgumentException if {@code end} happens before - * {@code start} - * - * @throws IOException if a stream can't be opened,or an I/O error occurs - * during reading - */ - public static EventStream openFile(Path file, Instant from, Instant to) throws IOException { - throw new UnsupportedOperationException("Not yet implemented"); - // return new EventFileStream(file); + return new EventFileStream(file, null, null); } /** @@ -95,6 +82,7 @@ * Performs an action on all events in the stream with a specified name. * * @param eventName the name of the event, not {@code null} + * * @param action an action to be performed on each {@code RecordedEvent} * that matches the event name, not {@code null} */ @@ -110,6 +98,9 @@ /** * Performs an action when the event stream is closed. + *

+ * If the stream is already closed, the action will be executed immediately + * in the current thread. * * @param action an action to be performed after the stream has been closed, * not {@code null} @@ -128,6 +119,7 @@ * removed. * * @param action the action to remove, not {@code null} + * * @return {@code true} if the action was removed, {@code false} otherwise * * @see #onClose(Runnable) @@ -138,35 +130,40 @@ boolean remove(Object action); /** - * Hint that the event object in an {@link #onEvent(Consumer)} action - * may be reused. + * Specifies that the event object in an {@link #onEvent(Consumer)} action + * is to be reused. *

* If reuse is set to {@code true), a callback should not keep a reference * to the event object after the callback from {@code onEvent} has returned. - *

- * By default reuse is set to {@code true} * - * @param resuse if event objects can be reused between calls to {@code #onEvent(Consumer)} + * @param resuse if event objects can be reused between calls to + * {@code #onEvent(Consumer)} * */ public void setReuse(boolean reuse); /** - * Orders events in chronological order aft the end timestamp + * Specifies that events arrives in chronological order, sorted by the time + * they were committed to the event stream. * - * TODO: WHAT ABOUT EVENTS THAT OCCUR WAY OUT OF ORDER - *

- * By default ordered is set to {@code true} - * - * @param ordered if event objects arrive in chronological order to {@code #onEvent(Consumer)} + * @param ordered if event objects arrive in chronological order to + * {@code #onEvent(Consumer)} */ public void setOrdered(boolean ordered); /** - * Starts processing events in the stream. + * Specifies start time of the event stream. + * + * @param startTime the start time, not {@code null} + * + * @throws IllegalStateException if the stream has already been started + */ + public void setStartTime(Instant startTime); + + /** + * Start processing events in the stream. *

- * All actions will performed on this stream will happen in the current - * thread. + * All actions performed on this stream will happen in the current thread. * * @throws IllegalStateException if the stream is already started or if it * has been closed @@ -174,11 +171,11 @@ void start(); /** - * Starts processing events in the stream asynchronously. + * Start processing events in the stream asynchronously. *

* All actions on this stream will be performed in a separate thread. * - * @throws IllegalStateException if the stream is already started or if it + * @throws IllegalStateException if the stream is already started, or if it * has been closed */ void startAsync(); diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java --- a/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Thu Jun 27 10:41:01 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java Wed Jul 03 22:51:44 2019 +0200 @@ -29,6 +29,7 @@ import java.security.AccessControlContext; import java.security.AccessController; import java.time.Duration; +import java.time.Instant; import java.util.Map; import java.util.function.Consumer; @@ -42,8 +43,8 @@ import jdk.jfr.internal.Utils; /** - * An event stream produces events from a file, directory or a running JVM (Java - * Virtual Machine). + * An recording stream produces events from a running JVM (Java Virtual + * Machine). */ public class RecordingStream implements AutoCloseable, EventStream { @@ -81,7 +82,7 @@ this.recording = new Recording(); this.recording.setFlushInterval(Duration.ofMillis(1000)); try { - this.stream = new EventDirectoryStream(acc, null); + this.stream = new EventDirectoryStream(acc, null, null); } catch (IOException ioe) { throw new IllegalStateException(ioe.getMessage()); } @@ -156,7 +157,8 @@ * The following example shows how to merge settings. * *

-     *     {@code
+     * {
+     *     @code
      *     Map settings = recording.getSettings();
      *     settings.putAll(additionalSettings);
      *     recordingStream.setSettings(settings);
@@ -214,8 +216,9 @@
     public EventSettings disable(Class eventClass) {
         return recording.disable(eventClass);
     }
+
     /**
-     * Determines how far back data is kept for the stream if the stream can't
+     * Determines how far back data is kept for the stream, if the stream can't
      * keep up.
      * 

* To control the amount of recording data stored on disk, the maximum @@ -314,7 +317,7 @@ * Determines how often events are made available for streaming. * * @param interval the interval at which events are made available to the - * stream + * stream, no {@code null} * * @throws IllegalArgumentException if interval is negative * @@ -338,4 +341,9 @@ public void setOrdered(boolean ordered) { stream.setOrdered(ordered); } + + @Override + public void setStartTime(Instant startTime) { + stream.setStartTime(startTime); + } } diff -r 216bf2e3b542 -r 099789ceff7d src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java --- a/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Thu Jun 27 10:41:01 2019 +0200 +++ b/src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/RepositoryFiles.java Wed Jul 03 22:51:44 2019 +0200 @@ -34,6 +34,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -45,7 +47,7 @@ public final class RepositoryFiles { private final Path repostory; - private final SortedMap pathSet = new TreeMap<>(); + private final NavigableMap pathSet = new TreeMap<>(); private final Map pathLookup = new HashMap<>(); private volatile boolean closed; @@ -57,8 +59,18 @@ return pathLookup.get(p); } + public Path lastPath() { + return nextPath(-1); + } + public Path nextPath(long startTimeNanos) { while (!closed) { + if (startTimeNanos == -1) { + Entry e = pathSet.lastEntry(); + if (e != null) { + return e.getValue(); + } + } SortedMap after = pathSet.tailMap(startTimeNanos); if (!after.isEmpty()) { Path path = after.get(after.firstKey()); @@ -147,4 +159,6 @@ pathSet.notify(); } } + + } \ No newline at end of file