8027316: Distinct operation on an unordered stream should not be a barrier
Reviewed-by: henryjen, mduigou, briangoetz
--- a/jdk/src/share/classes/java/util/stream/DistinctOps.java Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/src/share/classes/java/util/stream/DistinctOps.java Thu Oct 31 11:59:04 2013 +0100
@@ -54,6 +54,16 @@
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
+
+ <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+ // If the stream is SORTED then it should also be ORDERED so the following will also
+ // preserve the sort order
+ TerminalOp<T, LinkedHashSet<T>> reduceOp
+ = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
+ LinkedHashSet::addAll);
+ return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
+ }
+
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
@@ -63,12 +73,7 @@
return helper.evaluate(spliterator, false, generator);
}
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
- // If the stream is SORTED then it should also be ORDERED so the following will also
- // preserve the sort order
- TerminalOp<T, LinkedHashSet<T>> reduceOp
- = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
- LinkedHashSet::addAll);
- return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
+ return reduce(helper, spliterator);
}
else {
// Holder of null state since ConcurrentHashMap does not support null values
@@ -95,6 +100,22 @@
}
@Override
+ <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
+ // No-op
+ return helper.wrapSpliterator(spliterator);
+ }
+ else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ // Not lazy, barrier required to preserve order
+ return reduce(helper, spliterator).spliterator();
+ }
+ else {
+ // Lazy
+ return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
+ }
+ }
+
+ @Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
--- a/jdk/src/share/classes/java/util/stream/StreamSpliterators.java Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/src/share/classes/java/util/stream/StreamSpliterators.java Thu Oct 31 11:59:04 2013 +0100
@@ -27,6 +27,7 @@
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@@ -1227,6 +1228,88 @@
}
/**
+ * A wrapping spliterator that only reports distinct elements of the
+ * underlying spliterator. Does not preserve size and encounter order.
+ */
+ static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
+
+ // The value to represent null in the ConcurrentHashMap
+ private static final Object NULL_VALUE = new Object();
+
+ // The underlying spliterator
+ private final Spliterator<T> s;
+
+ // ConcurrentHashMap holding distinct elements as keys
+ private final ConcurrentHashMap<T, Boolean> seen;
+
+ // Temporary element, only used with tryAdvance
+ private T tmpSlot;
+
+ DistinctSpliterator(Spliterator<T> s) {
+ this(s, new ConcurrentHashMap<>());
+ }
+
+ private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
+ this.s = s;
+ this.seen = seen;
+ }
+
+ @Override
+ public void accept(T t) {
+ this.tmpSlot = t;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T mapNull(T t) {
+ return t != null ? t : (T) NULL_VALUE;
+ }
+
+ @Override
+ public boolean tryAdvance(Consumer<? super T> action) {
+ while (s.tryAdvance(this)) {
+ if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
+ action.accept(tmpSlot);
+ tmpSlot = null;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super T> action) {
+ s.forEachRemaining(t -> {
+ if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
+ action.accept(t);
+ }
+ });
+ }
+
+ @Override
+ public Spliterator<T> trySplit() {
+ Spliterator<T> split = s.trySplit();
+ return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
+ }
+
+ @Override
+ public long estimateSize() {
+ return s.estimateSize();
+ }
+
+ @Override
+ public int characteristics() {
+ return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
+ Spliterator.SORTED | Spliterator.ORDERED))
+ | Spliterator.DISTINCT;
+ }
+
+ @Override
+ public Comparator<? super T> getComparator() {
+ return s.getComparator();
+ }
+ }
+
+ /**
* A Spliterator that infinitely supplies elements in no particular order.
*
* <p>Splitting divides the estimated size in two and stops when the
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Thu Oct 31 11:59:04 2013 +0100
@@ -28,8 +28,10 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.*;
import static java.util.stream.LambdaTestHelpers.*;
@@ -48,6 +50,17 @@
assertCountSum(countTo(10).stream().distinct(), 10, 55);
}
+ public void testWithUnorderedInfiniteStream() {
+ // These tests should short-circuit, otherwise will fail with a time-out
+ // or an OOME
+
+ Integer one = Stream.iterate(1, i -> i + 1).unordered().parallel().distinct().findAny().get();
+ assertEquals(one.intValue(), 1);
+
+ Optional<Integer> oi = ThreadLocalRandom.current().ints().boxed().parallel().distinct().findAny();
+ assertTrue(oi.isPresent());
+ }
+
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testOp(String name, TestData.OfRef<Integer> data) {
Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);