diff -r e3d13a15c5c0 -r b786c0de868c jdk/src/share/classes/java/util/stream/DistinctOps.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/classes/java/util/stream/DistinctOps.java Wed Apr 24 16:15:47 2013 -0700 @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; + +/** + * Factory methods for transforming streams into duplicate-free streams, using + * {@link Object#equals(Object)} to determine equality. + * + * @since 1.8 + */ +final class DistinctOps { + + private DistinctOps() { } + + /** + * Appends a "distinct" operation to the provided stream, and returns the + * new stream. + * + * @param the type of both input and output elements + * @param upstream a reference stream with element type T + * @return the new stream + */ + static ReferencePipeline makeRef(AbstractPipeline upstream) { + return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, + StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { + // No-op + return helper.evaluate(spliterator, false, generator); + } + else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + // If the stream is SORTED then it should also be ORDERED so the following will also + // preserve the sort order + TerminalOp> reduceOp + = ReduceOps.>makeRef(LinkedHashSet::new, LinkedHashSet::add, + LinkedHashSet::addAll); + return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); + } + else { + // Holder of null state since ConcurrentHashMap does not support null values + AtomicBoolean seenNull = new AtomicBoolean(false); + ConcurrentHashMap map = new ConcurrentHashMap<>(); + TerminalOp forEachOp = ForEachOps.makeRef(t -> { + if (t == null) + seenNull.set(true); + else + map.putIfAbsent(t, Boolean.TRUE); + }, false); + forEachOp.evaluateParallel(helper, spliterator); + + // If null has been seen then copy the key set into a HashSet that supports null values + // and add null + Set keys = map.keySet(); + if (seenNull.get()) { + // TODO Implement a more efficient set-union view, rather than copying + keys = new HashSet<>(keys); + keys.add(null); + } + return Nodes.node(keys); + } + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + Objects.requireNonNull(sink); + + if (StreamOpFlag.DISTINCT.isKnown(flags)) { + return sink; + } else if (StreamOpFlag.SORTED.isKnown(flags)) { + return new Sink.ChainedReference(sink) { + boolean seenNull; + T lastSeen; + + @Override + public void begin(long size) { + seenNull = false; + lastSeen = null; + downstream.begin(-1); + } + + @Override + public void end() { + seenNull = false; + lastSeen = null; + downstream.end(); + } + + @Override + public void accept(T t) { + if (t == null) { + if (!seenNull) { + seenNull = true; + downstream.accept(lastSeen = null); + } + } else if (lastSeen == null || !t.equals(lastSeen)) { + downstream.accept(lastSeen = t); + } + } + }; + } else { + return new Sink.ChainedReference(sink) { + Set seen; + + @Override + public void begin(long size) { + seen = new HashSet<>(); + downstream.begin(-1); + } + + @Override + public void end() { + seen = null; + downstream.end(); + } + + @Override + public void accept(T t) { + if (!seen.contains(t)) { + seen.add(t); + downstream.accept(t); + } + } + }; + } + } + }; + } +}