--- a/jdk/src/java.base/share/classes/java/util/stream/Collectors.java Wed Jul 05 20:22:22 2017 +0200
+++ b/jdk/src/java.base/share/classes/java/util/stream/Collectors.java Fri Feb 27 09:58:25 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -404,6 +404,54 @@
}
/**
+ * Adapts a {@code Collector} accepting elements of type {@code U} to one
+ * accepting elements of type {@code T} by applying a flat mapping function
+ * to each input element before accumulation. The flat mapping function
+ * maps an input element to a {@link Stream stream} covering zero or more
+ * output elements that are then accumulated downstream. Each mapped stream
+ * is {@link java.util.stream.BaseStream#close() closed} after its contents
+ * have been placed downstream. (If a mapped stream is {@code null}
+ * an empty stream is used, instead.)
+ *
+ * @apiNote
+ * The {@code flatMapping()} collectors are most useful when used in a
+ * multi-level reduction, such as downstream of a {@code groupingBy} or
+ * {@code partitioningBy}. For example, given a stream of
+ * {@code Order}, to accumulate the set of line items for each customer:
+ * <pre>{@code
+ * Map<String, Set<LineItem>> itemsByCustomerName
+ * = orders.stream().collect(groupingBy(Order::getCustomerName,
+ * flatMapping(order -> order.getLineItems().stream(), toSet())));
+ * }</pre>
+ *
+ * @param <T> the type of the input elements
+ * @param <U> type of elements accepted by downstream collector
+ * @param <A> intermediate accumulation type of the downstream collector
+ * @param <R> result type of collector
+ * @param mapper a function to be applied to the input elements, which
+ * returns a stream of results
+ * @param downstream a collector which will receive the elements of the
+ * stream returned by mapper
+ * @return a collector which applies the mapping function to the input
+ * elements and provides the flat mapped results to the downstream collector
+ * @since 1.9
+ */
+ public static <T, U, A, R>
+ Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
+ Collector<? super U, A, R> downstream) {
+ BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
+ return new CollectorImpl<>(downstream.supplier(),
+ (r, t) -> {
+ try (Stream<? extends U> result = mapper.apply(t)) {
+ if (result != null)
+ result.sequential().forEach(u -> downstreamAccumulator.accept(r, u));
+ }
+ },
+ downstream.combiner(), downstream.finisher(),
+ downstream.characteristics());
+ }
+
+ /**
* Adapts a {@code Collector} to perform an additional finishing
* transformation. For example, one could adapt the {@link #toList()}
* collector to always produce an immutable list with:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java Fri Feb 27 09:58:25 2015 +0100
@@ -0,0 +1,701 @@
+/*
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.LambdaTestHelpers;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.StreamOpFlagTestHelper;
+import java.util.stream.StreamTestDataProvider;
+import java.util.stream.TestData;
+
+import org.testng.annotations.Test;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.flatMapping;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.groupingByConcurrent;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.reducing;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toConcurrentMap;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.LambdaTestHelpers.assertContents;
+import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
+import static java.util.stream.LambdaTestHelpers.mDoubler;
+
+/*
+ * @test
+ * @bug 8071600
+ * @summary Test for collectors.
+ */
+public class CollectorsTest extends OpTestCase {
+
+ private static abstract class CollectorAssertion<T, U> {
+ abstract void assertValue(U value,
+ Supplier<Stream<T>> source,
+ boolean ordered) throws ReflectiveOperationException;
+ }
+
+ static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
+ private final Function<T, V> mapper;
+ private final CollectorAssertion<V, R> downstream;
+
+ MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
+ this.mapper = mapper;
+ this.downstream = downstream;
+ }
+
+ @Override
+ void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
+ downstream.assertValue(value,
+ () -> source.get().map(mapper::apply),
+ ordered);
+ }
+ }
+
+ static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
+ private final Function<T, Stream<V>> mapper;
+ private final CollectorAssertion<V, R> downstream;
+
+ FlatMappingAssertion(Function<T, Stream<V>> mapper,
+ CollectorAssertion<V, R> downstream) {
+ this.mapper = mapper;
+ this.downstream = downstream;
+ }
+
+ @Override
+ void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
+ downstream.assertValue(value,
+ () -> source.get().flatMap(mapper::apply),
+ ordered);
+ }
+ }
+
+ static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
+ private final Class<? extends Map> clazz;
+ private final Function<T, K> classifier;
+ private final CollectorAssertion<T,V> downstream;
+
+ GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
+ CollectorAssertion<T, V> downstream) {
+ this.clazz = clazz;
+ this.classifier = classifier;
+ this.downstream = downstream;
+ }
+
+ @Override
+ void assertValue(M map,
+ Supplier<Stream<T>> source,
+ boolean ordered) throws ReflectiveOperationException {
+ if (!clazz.isAssignableFrom(map.getClass()))
+ fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
+ assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
+ for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ downstream.assertValue(entry.getValue(),
+ () -> source.get().filter(e -> classifier.apply(e).equals(key)),
+ ordered);
+ }
+ }
+ }
+
+ static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
+ private final Class<? extends Map> clazz;
+ private final Function<T, K> keyFn;
+ private final Function<T, V> valueFn;
+ private final BinaryOperator<V> mergeFn;
+
+ ToMapAssertion(Function<T, K> keyFn,
+ Function<T, V> valueFn,
+ BinaryOperator<V> mergeFn,
+ Class<? extends Map> clazz) {
+ this.clazz = clazz;
+ this.keyFn = keyFn;
+ this.valueFn = valueFn;
+ this.mergeFn = mergeFn;
+ }
+
+ @Override
+ void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
+ if (!clazz.isAssignableFrom(map.getClass()))
+ fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
+ Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
+ assertEquals(uniqueKeys, map.keySet());
+ source.get().forEach(t -> {
+ K key = keyFn.apply(t);
+ V v = source.get()
+ .filter(e -> key.equals(keyFn.apply(e)))
+ .map(valueFn)
+ .reduce(mergeFn)
+ .get();
+ assertEquals(map.get(key), v);
+ });
+ }
+ }
+
+ static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
+ private final Predicate<T> predicate;
+ private final CollectorAssertion<T,D> downstream;
+
+ PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
+ this.predicate = predicate;
+ this.downstream = downstream;
+ }
+
+ @Override
+ void assertValue(Map<Boolean, D> map,
+ Supplier<Stream<T>> source,
+ boolean ordered) throws ReflectiveOperationException {
+ if (!Map.class.isAssignableFrom(map.getClass()))
+ fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
+ assertEquals(2, map.size());
+ downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
+ downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
+ }
+ }
+
+ static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
+ @Override
+ void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
+ throws ReflectiveOperationException {
+ if (!List.class.isAssignableFrom(value.getClass()))
+ fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
+ Stream<T> stream = source.get();
+ List<T> result = new ArrayList<>();
+ for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
+ result.add(it.next());
+ if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
+ assertContents(value, result);
+ else
+ assertContentsUnordered(value, result);
+ }
+ }
+
+ static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
+ private final Class<? extends Collection> clazz;
+ private final boolean targetOrdered;
+
+ ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
+ this.clazz = clazz;
+ this.targetOrdered = targetOrdered;
+ }
+
+ @Override
+ void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
+ throws ReflectiveOperationException {
+ if (!clazz.isAssignableFrom(value.getClass()))
+ fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
+ Stream<T> stream = source.get();
+ Collection<T> result = clazz.newInstance();
+ for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
+ result.add(it.next());
+ if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
+ assertContents(value, result);
+ else
+ assertContentsUnordered(value, result);
+ }
+ }
+
+ static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
+ private final U identity;
+ private final Function<T, U> mapper;
+ private final BinaryOperator<U> reducer;
+
+ ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
+ this.identity = identity;
+ this.mapper = mapper;
+ this.reducer = reducer;
+ }
+
+ @Override
+ void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
+ throws ReflectiveOperationException {
+ Optional<U> reduced = source.get().map(mapper).reduce(reducer);
+ if (value == null)
+ assertTrue(!reduced.isPresent());
+ else if (!reduced.isPresent()) {
+ assertEquals(value, identity);
+ }
+ else {
+ assertEquals(value, reduced.get());
+ }
+ }
+ }
+
+ private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
+ return (act, exp, ord, par) -> {
+ if (par && (!ordered || !ord)) {
+ CollectorsTest.nestedMapEqualityAssertion(act, exp);
+ }
+ else {
+ LambdaTestHelpers.assertContentsEqual(act, exp);
+ }
+ };
+ }
+
+ private<T, M extends Map>
+ void exerciseMapCollection(TestData<T, Stream<T>> data,
+ Collector<T, ?, ? extends M> collector,
+ CollectorAssertion<T, M> assertion)
+ throws ReflectiveOperationException {
+ boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
+
+ M m = withData(data)
+ .terminal(s -> s.collect(collector))
+ .resultAsserter(mapTabulationAsserter(ordered))
+ .exercise();
+ assertion.assertValue(m, () -> data.stream(), ordered);
+
+ m = withData(data)
+ .terminal(s -> s.unordered().collect(collector))
+ .resultAsserter(mapTabulationAsserter(ordered))
+ .exercise();
+ assertion.assertValue(m, () -> data.stream(), false);
+ }
+
+ private static void nestedMapEqualityAssertion(Object o1, Object o2) {
+ if (o1 instanceof Map) {
+ Map m1 = (Map) o1;
+ Map m2 = (Map) o2;
+ assertContentsUnordered(m1.keySet(), m2.keySet());
+ for (Object k : m1.keySet())
+ nestedMapEqualityAssertion(m1.get(k), m2.get(k));
+ }
+ else if (o1 instanceof Collection) {
+ assertContentsUnordered(((Collection) o1), ((Collection) o2));
+ }
+ else
+ assertEquals(o1, o2);
+ }
+
+ private<T, R> void assertCollect(TestData.OfRef<T> data,
+ Collector<T, ?, R> collector,
+ Function<Stream<T>, R> streamReduction) {
+ R check = streamReduction.apply(data.stream());
+ withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ assertCollect(data, Collectors.reducing(0, Integer::sum),
+ s -> s.reduce(0, Integer::sum));
+ assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
+ s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
+ assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
+ s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
+
+ assertCollect(data, Collectors.reducing(Integer::sum),
+ s -> s.reduce(Integer::sum));
+ assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
+ s -> s.min(Integer::compare));
+ assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
+ s -> s.max(Integer::compare));
+
+ assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
+ s -> s.map(x -> x*2).reduce(0, Integer::sum));
+
+ assertCollect(data, Collectors.summingLong(x -> x * 2L),
+ s -> s.map(x -> x*2L).reduce(0L, Long::sum));
+ assertCollect(data, Collectors.summingInt(x -> x * 2),
+ s -> s.map(x -> x*2).reduce(0, Integer::sum));
+ assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
+ s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
+
+ assertCollect(data, Collectors.averagingInt(x -> x * 2),
+ s -> s.mapToInt(x -> x * 2).average().orElse(0));
+ assertCollect(data, Collectors.averagingLong(x -> x * 2),
+ s -> s.mapToLong(x -> x * 2).average().orElse(0));
+ assertCollect(data, Collectors.averagingDouble(x -> x * 2),
+ s -> s.mapToDouble(x -> x * 2).average().orElse(0));
+
+ // Test explicit Collector.of
+ Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
+ (a, b) -> {
+ a[0] += b * 2;
+ a[1]++;
+ },
+ (a, b) -> {
+ a[0] += b[0];
+ a[1] += b[1];
+ return a;
+ },
+ a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
+ assertCollect(data, avg2xint,
+ s -> s.mapToInt(x -> x * 2).average().orElse(0));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ withData(data)
+ .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
+ .expectedResult(join(data, ""))
+ .exercise();
+
+ Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
+ withData(data)
+ .terminal(s -> s.map(Object::toString).collect(likeJoining))
+ .expectedResult(join(data, ""))
+ .exercise();
+
+ withData(data)
+ .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
+ .expectedResult(join(data, ","))
+ .exercise();
+
+ withData(data)
+ .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
+ .expectedResult("[" + join(data, ",") + "]")
+ .exercise();
+
+ withData(data)
+ .terminal(s -> s.map(Object::toString)
+ .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
+ .toString())
+ .expectedResult(join(data, ""))
+ .exercise();
+
+ withData(data)
+ .terminal(s -> s.map(Object::toString)
+ .collect(() -> new StringJoiner(","),
+ (sj, cs) -> sj.add(cs),
+ (j1, j2) -> j1.merge(j2))
+ .toString())
+ .expectedResult(join(data, ","))
+ .exercise();
+
+ withData(data)
+ .terminal(s -> s.map(Object::toString)
+ .collect(() -> new StringJoiner(",", "[", "]"),
+ (sj, cs) -> sj.add(cs),
+ (j1, j2) -> j1.merge(j2))
+ .toString())
+ .expectedResult("[" + join(data, ",") + "]")
+ .exercise();
+ }
+
+ private<T> String join(TestData.OfRef<T> data, String delim) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (T i : data) {
+ if (!first)
+ sb.append(delim);
+ sb.append(i.toString());
+ first = false;
+ }
+ return sb.toString();
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> keyFn = i -> i * 2;
+ Function<Integer, Integer> valueFn = i -> i * 4;
+
+ List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
+ Set<Integer> dataAsSet = new HashSet<>(dataAsList);
+
+ BinaryOperator<Integer> sum = Integer::sum;
+ for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
+ (u, v) -> v,
+ sum)) {
+ try {
+ exerciseMapCollection(data, toMap(keyFn, valueFn),
+ new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
+ if (dataAsList.size() != dataAsSet.size())
+ fail("Expected ISE on input with duplicates");
+ }
+ catch (IllegalStateException e) {
+ if (dataAsList.size() == dataAsSet.size())
+ fail("Expected no ISE on input without duplicates");
+ }
+
+ exerciseMapCollection(data, toMap(keyFn, valueFn, op),
+ new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
+
+ exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
+ new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
+ }
+
+ // For concurrent maps, only use commutative merge functions
+ try {
+ exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
+ new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
+ if (dataAsList.size() != dataAsSet.size())
+ fail("Expected ISE on input with duplicates");
+ }
+ catch (IllegalStateException e) {
+ if (dataAsList.size() == dataAsSet.size())
+ fail("Expected no ISE on input without duplicates");
+ }
+
+ exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
+ new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
+
+ exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
+ new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> classifier = i -> i % 3;
+
+ // Single-level groupBy
+ exerciseMapCollection(data, groupingBy(classifier),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new ToListAssertion<>()));
+ exerciseMapCollection(data, groupingByConcurrent(classifier),
+ new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
+ new ToListAssertion<>()));
+
+ // With explicit constructors
+ exerciseMapCollection(data,
+ groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
+ new GroupingByAssertion<>(classifier, TreeMap.class,
+ new ToCollectionAssertion<Integer>(HashSet.class, false)));
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
+ toCollection(HashSet::new)),
+ new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
+ new ToCollectionAssertion<Integer>(HashSet.class, false)));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> classifier = i -> i % 3;
+ Function<Integer, Integer> mapper = i -> i * 2;
+
+ exerciseMapCollection(data,
+ groupingBy(classifier, mapping(mapper, toList())),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new MappingAssertion<>(mapper,
+ new ToListAssertion<>())));
+ }
+
+ @Test
+ public void testFlatMappingClose() {
+ Function<Integer, Integer> classifier = i -> i;
+ AtomicInteger ai = new AtomicInteger();
+ Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
+ Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
+ assertEquals(m.size(), ai.get());
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> classifier = i -> i % 3;
+ Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
+ Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
+ Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
+
+ exerciseMapCollection(data,
+ groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new FlatMappingAssertion<>(flatMapperBy0,
+ new ToListAssertion<>())));
+ exerciseMapCollection(data,
+ groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new FlatMappingAssertion<>(flatMapperBy0,
+ new ToListAssertion<>())));
+ exerciseMapCollection(data,
+ groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new FlatMappingAssertion<>(flatMapperBy2,
+ new ToListAssertion<>())));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> classifier = i -> i % 6;
+ Function<Integer, Integer> classifier2 = i -> i % 23;
+
+ // Two-level groupBy
+ exerciseMapCollection(data,
+ groupingBy(classifier, groupingBy(classifier2)),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new GroupingByAssertion<>(classifier2, HashMap.class,
+ new ToListAssertion<>())));
+ // with concurrent as upstream
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, groupingBy(classifier2)),
+ new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
+ new GroupingByAssertion<>(classifier2, HashMap.class,
+ new ToListAssertion<>())));
+ // with concurrent as downstream
+ exerciseMapCollection(data,
+ groupingBy(classifier, groupingByConcurrent(classifier2)),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
+ new ToListAssertion<>())));
+ // with concurrent as upstream and downstream
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
+ new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
+ new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
+ new ToListAssertion<>())));
+
+ // With explicit constructors
+ exerciseMapCollection(data,
+ groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
+ new GroupingByAssertion<>(classifier, TreeMap.class,
+ new GroupingByAssertion<>(classifier2, TreeMap.class,
+ new ToCollectionAssertion<Integer>(HashSet.class, false))));
+ // with concurrent as upstream
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
+ new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
+ new GroupingByAssertion<>(classifier2, TreeMap.class,
+ new ToListAssertion<>())));
+ // with concurrent as downstream
+ exerciseMapCollection(data,
+ groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
+ new GroupingByAssertion<>(classifier, TreeMap.class,
+ new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
+ new ToListAssertion<>())));
+ // with concurrent as upstream and downstream
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
+ new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
+ new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
+ new ToListAssertion<>())));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Function<Integer, Integer> classifier = i -> i % 3;
+
+ // Single-level simple reduce
+ exerciseMapCollection(data,
+ groupingBy(classifier, reducing(0, Integer::sum)),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
+ // with concurrent
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, reducing(0, Integer::sum)),
+ new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
+ new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
+
+ // With explicit constructors
+ exerciseMapCollection(data,
+ groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
+ new GroupingByAssertion<>(classifier, TreeMap.class,
+ new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
+ // with concurrent
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
+ new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
+ new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
+
+ // Single-level map-reduce
+ exerciseMapCollection(data,
+ groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
+ new GroupingByAssertion<>(classifier, HashMap.class,
+ new ReducingAssertion<>(0, mDoubler, Integer::sum)));
+ // with concurrent
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
+ new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
+ new ReducingAssertion<>(0, mDoubler, Integer::sum)));
+
+ // With explicit constructors
+ exerciseMapCollection(data,
+ groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
+ new GroupingByAssertion<>(classifier, TreeMap.class,
+ new ReducingAssertion<>(0, mDoubler, Integer::sum)));
+ // with concurrent
+ exerciseMapCollection(data,
+ groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
+ new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
+ new ReducingAssertion<>(0, mDoubler, Integer::sum)));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Predicate<Integer> classifier = i -> i % 3 == 0;
+
+ // Single-level partition to downstream List
+ exerciseMapCollection(data,
+ partitioningBy(classifier),
+ new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
+ exerciseMapCollection(data,
+ partitioningBy(classifier, toList()),
+ new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ Predicate<Integer> classifier = i -> i % 3 == 0;
+ Predicate<Integer> classifier2 = i -> i % 7 == 0;
+
+ // Two level partition
+ exerciseMapCollection(data,
+ partitioningBy(classifier, partitioningBy(classifier2)),
+ new PartitioningByAssertion<>(classifier,
+ new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
+
+ // Two level partition with reduce
+ exerciseMapCollection(data,
+ partitioningBy(classifier, reducing(0, Integer::sum)),
+ new PartitioningByAssertion<>(classifier,
+ new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
+ List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
+ List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
+ assertEquals(asList, asImmutableList);
+ try {
+ asImmutableList.add(0);
+ fail("Expecting immutable result");
+ }
+ catch (UnsupportedOperationException ignored) { }
+ }
+
+}
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java Wed Jul 05 20:22:22 2017 +0200
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java Fri Feb 27 09:58:25 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -34,8 +34,19 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-import java.util.stream.*;
+import java.util.function.Supplier;
+import java.util.stream.DoubleStream;
+import java.util.stream.DoubleStreamTestDataProvider;
+import java.util.stream.IntStream;
+import java.util.stream.IntStreamTestDataProvider;
+import java.util.stream.LongStream;
+import java.util.stream.LongStreamTestDataProvider;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.StreamTestDataProvider;
+import java.util.stream.TestData;
import static java.util.stream.LambdaTestHelpers.*;
import static java.util.stream.ThowableHelper.checkNPE;
@@ -66,6 +77,59 @@
exerciseOps(TestData.Factory.ofArray("LONG_STRING", new String[] {LONG_STRING}), s -> s.flatMap(flattenChars));
}
+ @Test
+ public void testClose() {
+ AtomicInteger before = new AtomicInteger();
+ AtomicInteger onClose = new AtomicInteger();
+
+ Supplier<Stream<Integer>> s = () -> {
+ before.set(0); onClose.set(0);
+ return Stream.of(1, 2).peek(e -> before.getAndIncrement());
+ };
+
+ s.get().flatMap(i -> Stream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+
+ s.get().flatMapToInt(i -> IntStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+
+ s.get().flatMapToLong(i -> LongStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+
+ s.get().flatMapToDouble(i -> DoubleStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+ }
+
+ @Test
+ public void testIntClose() {
+ AtomicInteger before = new AtomicInteger();
+ AtomicInteger onClose = new AtomicInteger();
+
+ IntStream.of(1, 2).peek(e -> before.getAndIncrement()).
+ flatMap(i -> IntStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+ }
+
+ @Test
+ public void testLongClose() {
+ AtomicInteger before = new AtomicInteger();
+ AtomicInteger onClose = new AtomicInteger();
+
+ LongStream.of(1, 2).peek(e -> before.getAndIncrement()).
+ flatMap(i -> LongStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+ }
+
+ @Test
+ public void testDoubleClose() {
+ AtomicInteger before = new AtomicInteger();
+ AtomicInteger onClose = new AtomicInteger();
+
+ DoubleStream.of(1, 2).peek(e -> before.getAndIncrement()).
+ flatMap(i -> DoubleStream.of(i, i).onClose(onClose::getAndIncrement)).count();
+ assertEquals(before.get(), onClose.get());
+ }
+
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testOps(String name, TestData.OfRef<Integer> data) {
Collection<Integer> result = exerciseOps(data, s -> s.flatMap(mfId));
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/TabulatorsTest.java Wed Jul 05 20:22:22 2017 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,621 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package org.openjdk.tests.java.util.stream;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.StringJoiner;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BinaryOperator;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.LambdaTestHelpers;
-import java.util.stream.OpTestCase;
-import java.util.stream.Stream;
-import java.util.stream.StreamOpFlagTestHelper;
-import java.util.stream.StreamTestDataProvider;
-import java.util.stream.TestData;
-
-import org.testng.annotations.Test;
-
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.groupingByConcurrent;
-import static java.util.stream.Collectors.partitioningBy;
-import static java.util.stream.Collectors.reducing;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toConcurrentMap;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toSet;
-import static java.util.stream.LambdaTestHelpers.assertContents;
-import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
-import static java.util.stream.LambdaTestHelpers.mDoubler;
-
-/**
- * TabulatorsTest
- *
- * @author Brian Goetz
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class TabulatorsTest extends OpTestCase {
-
- private static abstract class TabulationAssertion<T, U> {
- abstract void assertValue(U value,
- Supplier<Stream<T>> source,
- boolean ordered) throws ReflectiveOperationException;
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- static class GroupedMapAssertion<T, K, V, M extends Map<K, ? extends V>> extends TabulationAssertion<T, M> {
- private final Class<? extends Map> clazz;
- private final Function<T, K> classifier;
- private final TabulationAssertion<T,V> downstream;
-
- protected GroupedMapAssertion(Function<T, K> classifier,
- Class<? extends Map> clazz,
- TabulationAssertion<T, V> downstream) {
- this.clazz = clazz;
- this.classifier = classifier;
- this.downstream = downstream;
- }
-
- void assertValue(M map,
- Supplier<Stream<T>> source,
- boolean ordered) throws ReflectiveOperationException {
- if (!clazz.isAssignableFrom(map.getClass()))
- fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
- assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
- for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
- K key = entry.getKey();
- downstream.assertValue(entry.getValue(),
- () -> source.get().filter(e -> classifier.apply(e).equals(key)),
- ordered);
- }
- }
- }
-
- static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends TabulationAssertion<T, M> {
- private final Class<? extends Map> clazz;
- private final Function<T, K> keyFn;
- private final Function<T, V> valueFn;
- private final BinaryOperator<V> mergeFn;
-
- ToMapAssertion(Function<T, K> keyFn,
- Function<T, V> valueFn,
- BinaryOperator<V> mergeFn,
- Class<? extends Map> clazz) {
- this.clazz = clazz;
- this.keyFn = keyFn;
- this.valueFn = valueFn;
- this.mergeFn = mergeFn;
- }
-
- @Override
- void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
- Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
- assertTrue(clazz.isAssignableFrom(map.getClass()));
- assertEquals(uniqueKeys, map.keySet());
- source.get().forEach(t -> {
- K key = keyFn.apply(t);
- V v = source.get()
- .filter(e -> key.equals(keyFn.apply(e)))
- .map(valueFn)
- .reduce(mergeFn)
- .get();
- assertEquals(map.get(key), v);
- });
- }
- }
-
- static class PartitionAssertion<T, D> extends TabulationAssertion<T, Map<Boolean,D>> {
- private final Predicate<T> predicate;
- private final TabulationAssertion<T,D> downstream;
-
- protected PartitionAssertion(Predicate<T> predicate,
- TabulationAssertion<T, D> downstream) {
- this.predicate = predicate;
- this.downstream = downstream;
- }
-
- void assertValue(Map<Boolean, D> map,
- Supplier<Stream<T>> source,
- boolean ordered) throws ReflectiveOperationException {
- if (!Map.class.isAssignableFrom(map.getClass()))
- fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass()));
- assertEquals(2, map.size());
- downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
- downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
- }
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- static class ListAssertion<T> extends TabulationAssertion<T, List<T>> {
- @Override
- void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
- throws ReflectiveOperationException {
- if (!List.class.isAssignableFrom(value.getClass()))
- fail(String.format("Class mismatch in ListAssertion: %s", value.getClass()));
- Stream<T> stream = source.get();
- List<T> result = new ArrayList<>();
- for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
- result.add(it.next());
- if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
- assertContents(value, result);
- else
- assertContentsUnordered(value, result);
- }
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> {
- private final Class<? extends Collection> clazz;
- private final boolean targetOrdered;
-
- protected CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
- this.clazz = clazz;
- this.targetOrdered = targetOrdered;
- }
-
- @Override
- void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
- throws ReflectiveOperationException {
- if (!clazz.isAssignableFrom(value.getClass()))
- fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass()));
- Stream<T> stream = source.get();
- Collection<T> result = clazz.newInstance();
- for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
- result.add(it.next());
- if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
- assertContents(value, result);
- else
- assertContentsUnordered(value, result);
- }
- }
-
- static class ReduceAssertion<T, U> extends TabulationAssertion<T, U> {
- private final U identity;
- private final Function<T, U> mapper;
- private final BinaryOperator<U> reducer;
-
- ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
- this.identity = identity;
- this.mapper = mapper;
- this.reducer = reducer;
- }
-
- @Override
- void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
- throws ReflectiveOperationException {
- Optional<U> reduced = source.get().map(mapper).reduce(reducer);
- if (value == null)
- assertTrue(!reduced.isPresent());
- else if (!reduced.isPresent()) {
- assertEquals(value, identity);
- }
- else {
- assertEquals(value, reduced.get());
- }
- }
- }
-
- private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
- return (act, exp, ord, par) -> {
- if (par && (!ordered || !ord)) {
- TabulatorsTest.nestedMapEqualityAssertion(act, exp);
- }
- else {
- LambdaTestHelpers.assertContentsEqual(act, exp);
- }
- };
- }
-
- private<T, M extends Map>
- void exerciseMapTabulation(TestData<T, Stream<T>> data,
- Collector<T, ?, ? extends M> collector,
- TabulationAssertion<T, M> assertion)
- throws ReflectiveOperationException {
- boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
-
- M m = withData(data)
- .terminal(s -> s.collect(collector))
- .resultAsserter(mapTabulationAsserter(ordered))
- .exercise();
- assertion.assertValue(m, () -> data.stream(), ordered);
-
- m = withData(data)
- .terminal(s -> s.unordered().collect(collector))
- .resultAsserter(mapTabulationAsserter(ordered))
- .exercise();
- assertion.assertValue(m, () -> data.stream(), false);
- }
-
- private static void nestedMapEqualityAssertion(Object o1, Object o2) {
- if (o1 instanceof Map) {
- Map m1 = (Map) o1;
- Map m2 = (Map) o2;
- assertContentsUnordered(m1.keySet(), m2.keySet());
- for (Object k : m1.keySet())
- nestedMapEqualityAssertion(m1.get(k), m2.get(k));
- }
- else if (o1 instanceof Collection) {
- assertContentsUnordered(((Collection) o1), ((Collection) o2));
- }
- else
- assertEquals(o1, o2);
- }
-
- private<T, R> void assertCollect(TestData.OfRef<T> data,
- Collector<T, ?, R> collector,
- Function<Stream<T>, R> streamReduction) {
- R check = streamReduction.apply(data.stream());
- withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- assertCollect(data, Collectors.reducing(0, Integer::sum),
- s -> s.reduce(0, Integer::sum));
- assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
- s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
- assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
- s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
-
- assertCollect(data, Collectors.reducing(Integer::sum),
- s -> s.reduce(Integer::sum));
- assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
- s -> s.min(Integer::compare));
- assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
- s -> s.max(Integer::compare));
-
- assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
- s -> s.map(x -> x*2).reduce(0, Integer::sum));
-
- assertCollect(data, Collectors.summingLong(x -> x * 2L),
- s -> s.map(x -> x*2L).reduce(0L, Long::sum));
- assertCollect(data, Collectors.summingInt(x -> x * 2),
- s -> s.map(x -> x*2).reduce(0, Integer::sum));
- assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
- s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
-
- assertCollect(data, Collectors.averagingInt(x -> x * 2),
- s -> s.mapToInt(x -> x * 2).average().orElse(0));
- assertCollect(data, Collectors.averagingLong(x -> x * 2),
- s -> s.mapToLong(x -> x * 2).average().orElse(0));
- assertCollect(data, Collectors.averagingDouble(x -> x * 2),
- s -> s.mapToDouble(x -> x * 2).average().orElse(0));
-
- // Test explicit Collector.of
- Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
- (a, b) -> {
- a[0] += b * 2;
- a[1]++;
- },
- (a, b) -> {
- a[0] += b[0];
- a[1] += b[1];
- return a;
- },
- a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
- assertCollect(data, avg2xint,
- s -> s.mapToInt(x -> x * 2).average().orElse(0));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testJoin(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- withData(data)
- .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
- .expectedResult(join(data, ""))
- .exercise();
-
- Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
- withData(data)
- .terminal(s -> s.map(Object::toString).collect(likeJoining))
- .expectedResult(join(data, ""))
- .exercise();
-
- withData(data)
- .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
- .expectedResult(join(data, ","))
- .exercise();
-
- withData(data)
- .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
- .expectedResult("[" + join(data, ",") + "]")
- .exercise();
-
- withData(data)
- .terminal(s -> s.map(Object::toString)
- .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
- .toString())
- .expectedResult(join(data, ""))
- .exercise();
-
- withData(data)
- .terminal(s -> s.map(Object::toString)
- .collect(() -> new StringJoiner(","),
- (sj, cs) -> sj.add(cs),
- (j1, j2) -> j1.merge(j2))
- .toString())
- .expectedResult(join(data, ","))
- .exercise();
-
- withData(data)
- .terminal(s -> s.map(Object::toString)
- .collect(() -> new StringJoiner(",", "[", "]"),
- (sj, cs) -> sj.add(cs),
- (j1, j2) -> j1.merge(j2))
- .toString())
- .expectedResult("[" + join(data, ",") + "]")
- .exercise();
- }
-
- private<T> String join(TestData.OfRef<T> data, String delim) {
- StringBuilder sb = new StringBuilder();
- boolean first = true;
- for (T i : data) {
- if (!first)
- sb.append(delim);
- sb.append(i.toString());
- first = false;
- }
- return sb.toString();
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Function<Integer, Integer> keyFn = i -> i * 2;
- Function<Integer, Integer> valueFn = i -> i * 4;
-
- List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
- Set<Integer> dataAsSet = new HashSet<>(dataAsList);
-
- BinaryOperator<Integer> sum = Integer::sum;
- for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
- (u, v) -> v,
- sum)) {
- try {
- exerciseMapTabulation(data, toMap(keyFn, valueFn),
- new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
- if (dataAsList.size() != dataAsSet.size())
- fail("Expected ISE on input with duplicates");
- }
- catch (IllegalStateException e) {
- if (dataAsList.size() == dataAsSet.size())
- fail("Expected no ISE on input without duplicates");
- }
-
- exerciseMapTabulation(data, toMap(keyFn, valueFn, op),
- new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
-
- exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new),
- new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
- }
-
- // For concurrent maps, only use commutative merge functions
- try {
- exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn),
- new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
- if (dataAsList.size() != dataAsSet.size())
- fail("Expected ISE on input with duplicates");
- }
- catch (IllegalStateException e) {
- if (dataAsList.size() == dataAsSet.size())
- fail("Expected no ISE on input without duplicates");
- }
-
- exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum),
- new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
-
- exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
- new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testSimpleGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Function<Integer, Integer> classifier = i -> i % 3;
-
- // Single-level groupBy
- exerciseMapTabulation(data, groupingBy(classifier),
- new GroupedMapAssertion<>(classifier, HashMap.class,
- new ListAssertion<>()));
- exerciseMapTabulation(data, groupingByConcurrent(classifier),
- new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
- new ListAssertion<>()));
-
- // With explicit constructors
- exerciseMapTabulation(data,
- groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
- new GroupedMapAssertion<>(classifier, TreeMap.class,
- new CollectionAssertion<Integer>(HashSet.class, false)));
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
- toCollection(HashSet::new)),
- new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
- new CollectionAssertion<Integer>(HashSet.class, false)));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testTwoLevelGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Function<Integer, Integer> classifier = i -> i % 6;
- Function<Integer, Integer> classifier2 = i -> i % 23;
-
- // Two-level groupBy
- exerciseMapTabulation(data,
- groupingBy(classifier, groupingBy(classifier2)),
- new GroupedMapAssertion<>(classifier, HashMap.class,
- new GroupedMapAssertion<>(classifier2, HashMap.class,
- new ListAssertion<>())));
- // with concurrent as upstream
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, groupingBy(classifier2)),
- new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
- new GroupedMapAssertion<>(classifier2, HashMap.class,
- new ListAssertion<>())));
- // with concurrent as downstream
- exerciseMapTabulation(data,
- groupingBy(classifier, groupingByConcurrent(classifier2)),
- new GroupedMapAssertion<>(classifier, HashMap.class,
- new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
- new ListAssertion<>())));
- // with concurrent as upstream and downstream
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
- new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
- new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
- new ListAssertion<>())));
-
- // With explicit constructors
- exerciseMapTabulation(data,
- groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
- new GroupedMapAssertion<>(classifier, TreeMap.class,
- new GroupedMapAssertion<>(classifier2, TreeMap.class,
- new CollectionAssertion<Integer>(HashSet.class, false))));
- // with concurrent as upstream
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
- new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
- new GroupedMapAssertion<>(classifier2, TreeMap.class,
- new ListAssertion<>())));
- // with concurrent as downstream
- exerciseMapTabulation(data,
- groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
- new GroupedMapAssertion<>(classifier, TreeMap.class,
- new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
- new ListAssertion<>())));
- // with concurrent as upstream and downstream
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
- new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
- new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
- new ListAssertion<>())));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testGroupedReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Function<Integer, Integer> classifier = i -> i % 3;
-
- // Single-level simple reduce
- exerciseMapTabulation(data,
- groupingBy(classifier, reducing(0, Integer::sum)),
- new GroupedMapAssertion<>(classifier, HashMap.class,
- new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
- // with concurrent
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, reducing(0, Integer::sum)),
- new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
- new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
-
- // With explicit constructors
- exerciseMapTabulation(data,
- groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
- new GroupedMapAssertion<>(classifier, TreeMap.class,
- new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
- // with concurrent
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
- new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
- new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
-
- // Single-level map-reduce
- exerciseMapTabulation(data,
- groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
- new GroupedMapAssertion<>(classifier, HashMap.class,
- new ReduceAssertion<>(0, mDoubler, Integer::sum)));
- // with concurrent
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
- new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
- new ReduceAssertion<>(0, mDoubler, Integer::sum)));
-
- // With explicit constructors
- exerciseMapTabulation(data,
- groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
- new GroupedMapAssertion<>(classifier, TreeMap.class,
- new ReduceAssertion<>(0, mDoubler, Integer::sum)));
- // with concurrent
- exerciseMapTabulation(data,
- groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
- new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
- new ReduceAssertion<>(0, mDoubler, Integer::sum)));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testSimplePartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Predicate<Integer> classifier = i -> i % 3 == 0;
-
- // Single-level partition to downstream List
- exerciseMapTabulation(data,
- partitioningBy(classifier),
- new PartitionAssertion<>(classifier, new ListAssertion<>()));
- exerciseMapTabulation(data,
- partitioningBy(classifier, toList()),
- new PartitionAssertion<>(classifier, new ListAssertion<>()));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testTwoLevelPartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- Predicate<Integer> classifier = i -> i % 3 == 0;
- Predicate<Integer> classifier2 = i -> i % 7 == 0;
-
- // Two level partition
- exerciseMapTabulation(data,
- partitioningBy(classifier, partitioningBy(classifier2)),
- new PartitionAssertion<>(classifier,
- new PartitionAssertion(classifier2, new ListAssertion<>())));
-
- // Two level partition with reduce
- exerciseMapTabulation(data,
- partitioningBy(classifier, reducing(0, Integer::sum)),
- new PartitionAssertion<>(classifier,
- new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
- }
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
- List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
- List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
- assertEquals(asList, asImmutableList);
- try {
- asImmutableList.add(0);
- fail("Expecting immutable result");
- }
- catch (UnsupportedOperationException ignored) { }
- }
-
-}