jdk/src/share/classes/java/util/stream/DistinctOps.java
changeset 17182 b786c0de868c
child 19593 ce0cd954351c
equal deleted inserted replaced
17181:e3d13a15c5c0 17182:b786c0de868c
       
     1 /*
       
     2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package java.util.stream;
       
    26 
       
    27 import java.util.HashSet;
       
    28 import java.util.LinkedHashSet;
       
    29 import java.util.Objects;
       
    30 import java.util.Set;
       
    31 import java.util.Spliterator;
       
    32 import java.util.concurrent.ConcurrentHashMap;
       
    33 import java.util.concurrent.atomic.AtomicBoolean;
       
    34 import java.util.function.IntFunction;
       
    35 
       
    36 /**
       
    37  * Factory methods for transforming streams into duplicate-free streams, using
       
    38  * {@link Object#equals(Object)} to determine equality.
       
    39  *
       
    40  * @since 1.8
       
    41  */
       
    42 final class DistinctOps {
       
    43 
       
    44     private DistinctOps() { }
       
    45 
       
    46     /**
       
    47      * Appends a "distinct" operation to the provided stream, and returns the
       
    48      * new stream.
       
    49      *
       
    50      * @param <T> the type of both input and output elements
       
    51      * @param upstream a reference stream with element type T
       
    52      * @return the new stream
       
    53      */
       
    54     static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
       
    55         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
       
    56                                                       StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
       
    57             @Override
       
    58             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
       
    59                                               Spliterator<P_IN> spliterator,
       
    60                                               IntFunction<T[]> generator) {
       
    61                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
       
    62                     // No-op
       
    63                     return helper.evaluate(spliterator, false, generator);
       
    64                 }
       
    65                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
       
    66                     // If the stream is SORTED then it should also be ORDERED so the following will also
       
    67                     // preserve the sort order
       
    68                     TerminalOp<T, LinkedHashSet<T>> reduceOp
       
    69                             = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
       
    70                                                                      LinkedHashSet::addAll);
       
    71                     return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
       
    72                 }
       
    73                 else {
       
    74                     // Holder of null state since ConcurrentHashMap does not support null values
       
    75                     AtomicBoolean seenNull = new AtomicBoolean(false);
       
    76                     ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
       
    77                     TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
       
    78                         if (t == null)
       
    79                             seenNull.set(true);
       
    80                         else
       
    81                             map.putIfAbsent(t, Boolean.TRUE);
       
    82                     }, false);
       
    83                     forEachOp.evaluateParallel(helper, spliterator);
       
    84 
       
    85                     // If null has been seen then copy the key set into a HashSet that supports null values
       
    86                     // and add null
       
    87                     Set<T> keys = map.keySet();
       
    88                     if (seenNull.get()) {
       
    89                         // TODO Implement a more efficient set-union view, rather than copying
       
    90                         keys = new HashSet<>(keys);
       
    91                         keys.add(null);
       
    92                     }
       
    93                     return Nodes.node(keys);
       
    94                 }
       
    95             }
       
    96 
       
    97             @Override
       
    98             Sink<T> opWrapSink(int flags, Sink<T> sink) {
       
    99                 Objects.requireNonNull(sink);
       
   100 
       
   101                 if (StreamOpFlag.DISTINCT.isKnown(flags)) {
       
   102                     return sink;
       
   103                 } else if (StreamOpFlag.SORTED.isKnown(flags)) {
       
   104                     return new Sink.ChainedReference<T>(sink) {
       
   105                         boolean seenNull;
       
   106                         T lastSeen;
       
   107 
       
   108                         @Override
       
   109                         public void begin(long size) {
       
   110                             seenNull = false;
       
   111                             lastSeen = null;
       
   112                             downstream.begin(-1);
       
   113                         }
       
   114 
       
   115                         @Override
       
   116                         public void end() {
       
   117                             seenNull = false;
       
   118                             lastSeen = null;
       
   119                             downstream.end();
       
   120                         }
       
   121 
       
   122                         @Override
       
   123                         public void accept(T t) {
       
   124                             if (t == null) {
       
   125                                 if (!seenNull) {
       
   126                                     seenNull = true;
       
   127                                     downstream.accept(lastSeen = null);
       
   128                                 }
       
   129                             } else if (lastSeen == null || !t.equals(lastSeen)) {
       
   130                                 downstream.accept(lastSeen = t);
       
   131                             }
       
   132                         }
       
   133                     };
       
   134                 } else {
       
   135                     return new Sink.ChainedReference<T>(sink) {
       
   136                         Set<T> seen;
       
   137 
       
   138                         @Override
       
   139                         public void begin(long size) {
       
   140                             seen = new HashSet<>();
       
   141                             downstream.begin(-1);
       
   142                         }
       
   143 
       
   144                         @Override
       
   145                         public void end() {
       
   146                             seen = null;
       
   147                             downstream.end();
       
   148                         }
       
   149 
       
   150                         @Override
       
   151                         public void accept(T t) {
       
   152                             if (!seen.contains(t)) {
       
   153                                 seen.add(t);
       
   154                                 downstream.accept(t);
       
   155                             }
       
   156                         }
       
   157                     };
       
   158                 }
       
   159             }
       
   160         };
       
   161     }
       
   162 }