8027316: Distinct operation on an unordered stream should not be a barrier
authorpsandoz
Thu, 31 Oct 2013 11:59:04 +0100
changeset 21422 6fca66995a27
parent 21421 aadfbd8b351b
child 21423 6f6edad5b031
8027316: Distinct operation on an unordered stream should not be a barrier Reviewed-by: henryjen, mduigou, briangoetz
jdk/src/share/classes/java/util/stream/DistinctOps.java
jdk/src/share/classes/java/util/stream/StreamSpliterators.java
jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java
--- a/jdk/src/share/classes/java/util/stream/DistinctOps.java	Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/src/share/classes/java/util/stream/DistinctOps.java	Thu Oct 31 11:59:04 2013 +0100
@@ -54,6 +54,16 @@
     static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                       StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
+
+            <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+                // If the stream is SORTED then it should also be ORDERED so the following will also
+                // preserve the sort order
+                TerminalOp<T, LinkedHashSet<T>> reduceOp
+                        = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
+                                                                 LinkedHashSet::addAll);
+                return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
+            }
+
             @Override
             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
                                               Spliterator<P_IN> spliterator,
@@ -63,12 +73,7 @@
                     return helper.evaluate(spliterator, false, generator);
                 }
                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
-                    // If the stream is SORTED then it should also be ORDERED so the following will also
-                    // preserve the sort order
-                    TerminalOp<T, LinkedHashSet<T>> reduceOp
-                            = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
-                                                                     LinkedHashSet::addAll);
-                    return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
+                    return reduce(helper, spliterator);
                 }
                 else {
                     // Holder of null state since ConcurrentHashMap does not support null values
@@ -95,6 +100,22 @@
             }
 
             @Override
+            <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
+                    // No-op
+                    return helper.wrapSpliterator(spliterator);
+                }
+                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+                    // Not lazy, barrier required to preserve order
+                    return reduce(helper, spliterator).spliterator();
+                }
+                else {
+                    // Lazy
+                    return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
+                }
+            }
+
+            @Override
             Sink<T> opWrapSink(int flags, Sink<T> sink) {
                 Objects.requireNonNull(sink);
 
--- a/jdk/src/share/classes/java/util/stream/StreamSpliterators.java	Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/src/share/classes/java/util/stream/StreamSpliterators.java	Thu Oct 31 11:59:04 2013 +0100
@@ -27,6 +27,7 @@
 import java.util.Comparator;
 import java.util.Objects;
 import java.util.Spliterator;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
@@ -1227,6 +1228,88 @@
     }
 
     /**
+     * A wrapping spliterator that only reports distinct elements of the
+     * underlying spliterator. Does not preserve size and encounter order.
+     */
+    static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
+
+        // The value to represent null in the ConcurrentHashMap
+        private static final Object NULL_VALUE = new Object();
+
+        // The underlying spliterator
+        private final Spliterator<T> s;
+
+        // ConcurrentHashMap holding distinct elements as keys
+        private final ConcurrentHashMap<T, Boolean> seen;
+
+        // Temporary element, only used with tryAdvance
+        private T tmpSlot;
+
+        DistinctSpliterator(Spliterator<T> s) {
+            this(s, new ConcurrentHashMap<>());
+        }
+
+        private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
+            this.s = s;
+            this.seen = seen;
+        }
+
+        @Override
+        public void accept(T t) {
+            this.tmpSlot = t;
+        }
+
+        @SuppressWarnings("unchecked")
+        private T mapNull(T t) {
+            return t != null ? t : (T) NULL_VALUE;
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super T> action) {
+            while (s.tryAdvance(this)) {
+                if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
+                    action.accept(tmpSlot);
+                    tmpSlot = null;
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void forEachRemaining(Consumer<? super T> action) {
+            s.forEachRemaining(t -> {
+                if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
+                    action.accept(t);
+                }
+            });
+        }
+
+        @Override
+        public Spliterator<T> trySplit() {
+            Spliterator<T> split = s.trySplit();
+            return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
+        }
+
+        @Override
+        public long estimateSize() {
+            return s.estimateSize();
+        }
+
+        @Override
+        public int characteristics() {
+            return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
+                                            Spliterator.SORTED | Spliterator.ORDERED))
+                   | Spliterator.DISTINCT;
+        }
+
+        @Override
+        public Comparator<? super T> getComparator() {
+            return s.getComparator();
+        }
+    }
+
+    /**
      * A Spliterator that infinitely supplies elements in no particular order.
      *
      * <p>Splitting divides the estimated size in two and stops when the
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java	Wed Oct 30 18:39:09 2013 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java	Thu Oct 31 11:59:04 2013 +0100
@@ -28,8 +28,10 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Spliterator;
 import java.util.Spliterators;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.*;
 
 import static java.util.stream.LambdaTestHelpers.*;
@@ -48,6 +50,17 @@
         assertCountSum(countTo(10).stream().distinct(), 10, 55);
     }
 
+    public void testWithUnorderedInfiniteStream() {
+        // These tests should short-circuit, otherwise will fail with a time-out
+        // or an OOME
+
+        Integer one = Stream.iterate(1, i -> i + 1).unordered().parallel().distinct().findAny().get();
+        assertEquals(one.intValue(), 1);
+
+        Optional<Integer> oi = ThreadLocalRandom.current().ints().boxed().parallel().distinct().findAny();
+        assertTrue(oi.isPresent());
+    }
+
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOp(String name, TestData.OfRef<Integer> data) {
         Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);