# HG changeset patch # User psandoz # Date 1425027505 -3600 # Node ID a2c7a365dde4088fa1fbd0bcdbb088be1dcdcf3e # Parent 874d76e4699dfcd61ae1826c9fe0ddc1610ad598 8071600: Add a flat-mapping collector Reviewed-by: smarks, chegar, briangoetz diff -r 874d76e4699d -r a2c7a365dde4 jdk/src/java.base/share/classes/java/util/stream/Collectors.java --- a/jdk/src/java.base/share/classes/java/util/stream/Collectors.java Wed Jul 05 20:22:22 2017 +0200 +++ b/jdk/src/java.base/share/classes/java/util/stream/Collectors.java Fri Feb 27 09:58:25 2015 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -404,6 +404,54 @@ } /** + * Adapts a {@code Collector} accepting elements of type {@code U} to one + * accepting elements of type {@code T} by applying a flat mapping function + * to each input element before accumulation. The flat mapping function + * maps an input element to a {@link Stream stream} covering zero or more + * output elements that are then accumulated downstream. Each mapped stream + * is {@link java.util.stream.BaseStream#close() closed} after its contents + * have been placed downstream. (If a mapped stream is {@code null} + * an empty stream is used, instead.) + * + * @apiNote + * The {@code flatMapping()} collectors are most useful when used in a + * multi-level reduction, such as downstream of a {@code groupingBy} or + * {@code partitioningBy}. For example, given a stream of + * {@code Order}, to accumulate the set of line items for each customer: + *
{@code
+     *     Map> itemsByCustomerName
+     *         = orders.stream().collect(groupingBy(Order::getCustomerName,
+     *                                              flatMapping(order -> order.getLineItems().stream(), toSet())));
+     * }
+ * + * @param the type of the input elements + * @param type of elements accepted by downstream collector + * @param intermediate accumulation type of the downstream collector + * @param result type of collector + * @param mapper a function to be applied to the input elements, which + * returns a stream of results + * @param downstream a collector which will receive the elements of the + * stream returned by mapper + * @return a collector which applies the mapping function to the input + * elements and provides the flat mapped results to the downstream collector + * @since 1.9 + */ + public static + Collector flatMapping(Function> mapper, + Collector downstream) { + BiConsumer downstreamAccumulator = downstream.accumulator(); + return new CollectorImpl<>(downstream.supplier(), + (r, t) -> { + try (Stream result = mapper.apply(t)) { + if (result != null) + result.sequential().forEach(u -> downstreamAccumulator.accept(r, u)); + } + }, + downstream.combiner(), downstream.finisher(), + downstream.characteristics()); + } + + /** * Adapts a {@code Collector} to perform an additional finishing * transformation. For example, one could adapt the {@link #toList()} * collector to always produce an immutable list with: diff -r 874d76e4699d -r a2c7a365dde4 jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java Fri Feb 27 09:58:25 2015 +0100 @@ -0,0 +1,701 @@ +/* + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.tests.java.util.stream; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.StringJoiner; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.LambdaTestHelpers; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; +import java.util.stream.StreamOpFlagTestHelper; +import java.util.stream.StreamTestDataProvider; +import java.util.stream.TestData; + +import org.testng.annotations.Test; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.flatMapping; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.groupingByConcurrent; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.partitioningBy; +import static java.util.stream.Collectors.reducing; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toConcurrentMap; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.LambdaTestHelpers.assertContents; +import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; +import static java.util.stream.LambdaTestHelpers.mDoubler; + +/* + * @test + * @bug 8071600 + * @summary Test for collectors. + */ +public class CollectorsTest extends OpTestCase { + + private static abstract class CollectorAssertion { + abstract void assertValue(U value, + Supplier> source, + boolean ordered) throws ReflectiveOperationException; + } + + static class MappingAssertion extends CollectorAssertion { + private final Function mapper; + private final CollectorAssertion downstream; + + MappingAssertion(Function mapper, CollectorAssertion downstream) { + this.mapper = mapper; + this.downstream = downstream; + } + + @Override + void assertValue(R value, Supplier> source, boolean ordered) throws ReflectiveOperationException { + downstream.assertValue(value, + () -> source.get().map(mapper::apply), + ordered); + } + } + + static class FlatMappingAssertion extends CollectorAssertion { + private final Function> mapper; + private final CollectorAssertion downstream; + + FlatMappingAssertion(Function> mapper, + CollectorAssertion downstream) { + this.mapper = mapper; + this.downstream = downstream; + } + + @Override + void assertValue(R value, Supplier> source, boolean ordered) throws ReflectiveOperationException { + downstream.assertValue(value, + () -> source.get().flatMap(mapper::apply), + ordered); + } + } + + static class GroupingByAssertion> extends CollectorAssertion { + private final Class clazz; + private final Function classifier; + private final CollectorAssertion downstream; + + GroupingByAssertion(Function classifier, Class clazz, + CollectorAssertion downstream) { + this.clazz = clazz; + this.classifier = classifier; + this.downstream = downstream; + } + + @Override + void assertValue(M map, + Supplier> source, + boolean ordered) throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(map.getClass())) + fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass())); + assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet())); + for (Map.Entry entry : map.entrySet()) { + K key = entry.getKey(); + downstream.assertValue(entry.getValue(), + () -> source.get().filter(e -> classifier.apply(e).equals(key)), + ordered); + } + } + } + + static class ToMapAssertion> extends CollectorAssertion { + private final Class clazz; + private final Function keyFn; + private final Function valueFn; + private final BinaryOperator mergeFn; + + ToMapAssertion(Function keyFn, + Function valueFn, + BinaryOperator mergeFn, + Class clazz) { + this.clazz = clazz; + this.keyFn = keyFn; + this.valueFn = valueFn; + this.mergeFn = mergeFn; + } + + @Override + void assertValue(M map, Supplier> source, boolean ordered) throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(map.getClass())) + fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass())); + Set uniqueKeys = source.get().map(keyFn).collect(toSet()); + assertEquals(uniqueKeys, map.keySet()); + source.get().forEach(t -> { + K key = keyFn.apply(t); + V v = source.get() + .filter(e -> key.equals(keyFn.apply(e))) + .map(valueFn) + .reduce(mergeFn) + .get(); + assertEquals(map.get(key), v); + }); + } + } + + static class PartitioningByAssertion extends CollectorAssertion> { + private final Predicate predicate; + private final CollectorAssertion downstream; + + PartitioningByAssertion(Predicate predicate, CollectorAssertion downstream) { + this.predicate = predicate; + this.downstream = downstream; + } + + @Override + void assertValue(Map map, + Supplier> source, + boolean ordered) throws ReflectiveOperationException { + if (!Map.class.isAssignableFrom(map.getClass())) + fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass())); + assertEquals(2, map.size()); + downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered); + downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered); + } + } + + static class ToListAssertion extends CollectorAssertion> { + @Override + void assertValue(List value, Supplier> source, boolean ordered) + throws ReflectiveOperationException { + if (!List.class.isAssignableFrom(value.getClass())) + fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass())); + Stream stream = source.get(); + List result = new ArrayList<>(); + for (Iterator it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add + result.add(it.next()); + if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered) + assertContents(value, result); + else + assertContentsUnordered(value, result); + } + } + + static class ToCollectionAssertion extends CollectorAssertion> { + private final Class clazz; + private final boolean targetOrdered; + + ToCollectionAssertion(Class clazz, boolean targetOrdered) { + this.clazz = clazz; + this.targetOrdered = targetOrdered; + } + + @Override + void assertValue(Collection value, Supplier> source, boolean ordered) + throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(value.getClass())) + fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass())); + Stream stream = source.get(); + Collection result = clazz.newInstance(); + for (Iterator it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add + result.add(it.next()); + if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered) + assertContents(value, result); + else + assertContentsUnordered(value, result); + } + } + + static class ReducingAssertion extends CollectorAssertion { + private final U identity; + private final Function mapper; + private final BinaryOperator reducer; + + ReducingAssertion(U identity, Function mapper, BinaryOperator reducer) { + this.identity = identity; + this.mapper = mapper; + this.reducer = reducer; + } + + @Override + void assertValue(U value, Supplier> source, boolean ordered) + throws ReflectiveOperationException { + Optional reduced = source.get().map(mapper).reduce(reducer); + if (value == null) + assertTrue(!reduced.isPresent()); + else if (!reduced.isPresent()) { + assertEquals(value, identity); + } + else { + assertEquals(value, reduced.get()); + } + } + } + + private ResultAsserter mapTabulationAsserter(boolean ordered) { + return (act, exp, ord, par) -> { + if (par && (!ordered || !ord)) { + CollectorsTest.nestedMapEqualityAssertion(act, exp); + } + else { + LambdaTestHelpers.assertContentsEqual(act, exp); + } + }; + } + + private + void exerciseMapCollection(TestData> data, + Collector collector, + CollectorAssertion assertion) + throws ReflectiveOperationException { + boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED); + + M m = withData(data) + .terminal(s -> s.collect(collector)) + .resultAsserter(mapTabulationAsserter(ordered)) + .exercise(); + assertion.assertValue(m, () -> data.stream(), ordered); + + m = withData(data) + .terminal(s -> s.unordered().collect(collector)) + .resultAsserter(mapTabulationAsserter(ordered)) + .exercise(); + assertion.assertValue(m, () -> data.stream(), false); + } + + private static void nestedMapEqualityAssertion(Object o1, Object o2) { + if (o1 instanceof Map) { + Map m1 = (Map) o1; + Map m2 = (Map) o2; + assertContentsUnordered(m1.keySet(), m2.keySet()); + for (Object k : m1.keySet()) + nestedMapEqualityAssertion(m1.get(k), m2.get(k)); + } + else if (o1 instanceof Collection) { + assertContentsUnordered(((Collection) o1), ((Collection) o2)); + } + else + assertEquals(o1, o2); + } + + private void assertCollect(TestData.OfRef data, + Collector collector, + Function, R> streamReduction) { + R check = streamReduction.apply(data.stream()); + withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise(); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testReducing(String name, TestData.OfRef data) throws ReflectiveOperationException { + assertCollect(data, Collectors.reducing(0, Integer::sum), + s -> s.reduce(0, Integer::sum)); + assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min), + s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE)); + assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max), + s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE)); + + assertCollect(data, Collectors.reducing(Integer::sum), + s -> s.reduce(Integer::sum)); + assertCollect(data, Collectors.minBy(Comparator.naturalOrder()), + s -> s.min(Integer::compare)); + assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()), + s -> s.max(Integer::compare)); + + assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum), + s -> s.map(x -> x*2).reduce(0, Integer::sum)); + + assertCollect(data, Collectors.summingLong(x -> x * 2L), + s -> s.map(x -> x*2L).reduce(0L, Long::sum)); + assertCollect(data, Collectors.summingInt(x -> x * 2), + s -> s.map(x -> x*2).reduce(0, Integer::sum)); + assertCollect(data, Collectors.summingDouble(x -> x * 2.0d), + s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum)); + + assertCollect(data, Collectors.averagingInt(x -> x * 2), + s -> s.mapToInt(x -> x * 2).average().orElse(0)); + assertCollect(data, Collectors.averagingLong(x -> x * 2), + s -> s.mapToLong(x -> x * 2).average().orElse(0)); + assertCollect(data, Collectors.averagingDouble(x -> x * 2), + s -> s.mapToDouble(x -> x * 2).average().orElse(0)); + + // Test explicit Collector.of + Collector avg2xint = Collector.of(() -> new long[2], + (a, b) -> { + a[0] += b * 2; + a[1]++; + }, + (a, b) -> { + a[0] += b[0]; + a[1] += b[1]; + return a; + }, + a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]); + assertCollect(data, avg2xint, + s -> s.mapToInt(x -> x * 2).average().orElse(0)); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testJoining(String name, TestData.OfRef data) throws ReflectiveOperationException { + withData(data) + .terminal(s -> s.map(Object::toString).collect(Collectors.joining())) + .expectedResult(join(data, "")) + .exercise(); + + Collector likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString); + withData(data) + .terminal(s -> s.map(Object::toString).collect(likeJoining)) + .expectedResult(join(data, "")) + .exercise(); + + withData(data) + .terminal(s -> s.map(Object::toString).collect(Collectors.joining(","))) + .expectedResult(join(data, ",")) + .exercise(); + + withData(data) + .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]"))) + .expectedResult("[" + join(data, ",") + "]") + .exercise(); + + withData(data) + .terminal(s -> s.map(Object::toString) + .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) + .toString()) + .expectedResult(join(data, "")) + .exercise(); + + withData(data) + .terminal(s -> s.map(Object::toString) + .collect(() -> new StringJoiner(","), + (sj, cs) -> sj.add(cs), + (j1, j2) -> j1.merge(j2)) + .toString()) + .expectedResult(join(data, ",")) + .exercise(); + + withData(data) + .terminal(s -> s.map(Object::toString) + .collect(() -> new StringJoiner(",", "[", "]"), + (sj, cs) -> sj.add(cs), + (j1, j2) -> j1.merge(j2)) + .toString()) + .expectedResult("[" + join(data, ",") + "]") + .exercise(); + } + + private String join(TestData.OfRef data, String delim) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (T i : data) { + if (!first) + sb.append(delim); + sb.append(i.toString()); + first = false; + } + return sb.toString(); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testSimpleToMap(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function keyFn = i -> i * 2; + Function valueFn = i -> i * 4; + + List dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new)); + Set dataAsSet = new HashSet<>(dataAsList); + + BinaryOperator sum = Integer::sum; + for (BinaryOperator op : Arrays.asList((u, v) -> u, + (u, v) -> v, + sum)) { + try { + exerciseMapCollection(data, toMap(keyFn, valueFn), + new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); + if (dataAsList.size() != dataAsSet.size()) + fail("Expected ISE on input with duplicates"); + } + catch (IllegalStateException e) { + if (dataAsList.size() == dataAsSet.size()) + fail("Expected no ISE on input without duplicates"); + } + + exerciseMapCollection(data, toMap(keyFn, valueFn, op), + new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); + + exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new), + new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class)); + } + + // For concurrent maps, only use commutative merge functions + try { + exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn), + new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); + if (dataAsList.size() != dataAsSet.size()) + fail("Expected ISE on input with duplicates"); + } + catch (IllegalStateException e) { + if (dataAsList.size() == dataAsSet.size()) + fail("Expected no ISE on input without duplicates"); + } + + exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum), + new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); + + exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new), + new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class)); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testSimpleGroupingBy(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function classifier = i -> i % 3; + + // Single-level groupBy + exerciseMapCollection(data, groupingBy(classifier), + new GroupingByAssertion<>(classifier, HashMap.class, + new ToListAssertion<>())); + exerciseMapCollection(data, groupingByConcurrent(classifier), + new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, + new ToListAssertion<>())); + + // With explicit constructors + exerciseMapCollection(data, + groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)), + new GroupingByAssertion<>(classifier, TreeMap.class, + new ToCollectionAssertion(HashSet.class, false))); + exerciseMapCollection(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new, + toCollection(HashSet::new)), + new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, + new ToCollectionAssertion(HashSet.class, false))); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testGroupingByWithMapping(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function classifier = i -> i % 3; + Function mapper = i -> i * 2; + + exerciseMapCollection(data, + groupingBy(classifier, mapping(mapper, toList())), + new GroupingByAssertion<>(classifier, HashMap.class, + new MappingAssertion<>(mapper, + new ToListAssertion<>()))); + } + + @Test + public void testFlatMappingClose() { + Function classifier = i -> i; + AtomicInteger ai = new AtomicInteger(); + Function> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement); + Map> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList()))); + assertEquals(m.size(), ai.get()); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testGroupingByWithFlatMapping(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function classifier = i -> i % 3; + Function> flatMapperByNull = i -> null; + Function> flatMapperBy0 = i -> Stream.empty(); + Function> flatMapperBy2 = i -> Stream.of(i, i); + + exerciseMapCollection(data, + groupingBy(classifier, flatMapping(flatMapperByNull, toList())), + new GroupingByAssertion<>(classifier, HashMap.class, + new FlatMappingAssertion<>(flatMapperBy0, + new ToListAssertion<>()))); + exerciseMapCollection(data, + groupingBy(classifier, flatMapping(flatMapperBy0, toList())), + new GroupingByAssertion<>(classifier, HashMap.class, + new FlatMappingAssertion<>(flatMapperBy0, + new ToListAssertion<>()))); + exerciseMapCollection(data, + groupingBy(classifier, flatMapping(flatMapperBy2, toList())), + new GroupingByAssertion<>(classifier, HashMap.class, + new FlatMappingAssertion<>(flatMapperBy2, + new ToListAssertion<>()))); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testTwoLevelGroupingBy(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function classifier = i -> i % 6; + Function classifier2 = i -> i % 23; + + // Two-level groupBy + exerciseMapCollection(data, + groupingBy(classifier, groupingBy(classifier2)), + new GroupingByAssertion<>(classifier, HashMap.class, + new GroupingByAssertion<>(classifier2, HashMap.class, + new ToListAssertion<>()))); + // with concurrent as upstream + exerciseMapCollection(data, + groupingByConcurrent(classifier, groupingBy(classifier2)), + new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, + new GroupingByAssertion<>(classifier2, HashMap.class, + new ToListAssertion<>()))); + // with concurrent as downstream + exerciseMapCollection(data, + groupingBy(classifier, groupingByConcurrent(classifier2)), + new GroupingByAssertion<>(classifier, HashMap.class, + new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class, + new ToListAssertion<>()))); + // with concurrent as upstream and downstream + exerciseMapCollection(data, + groupingByConcurrent(classifier, groupingByConcurrent(classifier2)), + new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, + new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class, + new ToListAssertion<>()))); + + // With explicit constructors + exerciseMapCollection(data, + groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))), + new GroupingByAssertion<>(classifier, TreeMap.class, + new GroupingByAssertion<>(classifier2, TreeMap.class, + new ToCollectionAssertion(HashSet.class, false)))); + // with concurrent as upstream + exerciseMapCollection(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())), + new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, + new GroupingByAssertion<>(classifier2, TreeMap.class, + new ToListAssertion<>()))); + // with concurrent as downstream + exerciseMapCollection(data, + groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), + new GroupingByAssertion<>(classifier, TreeMap.class, + new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class, + new ToListAssertion<>()))); + // with concurrent as upstream and downstream + exerciseMapCollection(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), + new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, + new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class, + new ToListAssertion<>()))); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testGroupubgByWithReducing(String name, TestData.OfRef data) throws ReflectiveOperationException { + Function classifier = i -> i % 3; + + // Single-level simple reduce + exerciseMapCollection(data, + groupingBy(classifier, reducing(0, Integer::sum)), + new GroupingByAssertion<>(classifier, HashMap.class, + new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); + // with concurrent + exerciseMapCollection(data, + groupingByConcurrent(classifier, reducing(0, Integer::sum)), + new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, + new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); + + // With explicit constructors + exerciseMapCollection(data, + groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)), + new GroupingByAssertion<>(classifier, TreeMap.class, + new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); + // with concurrent + exerciseMapCollection(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)), + new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, + new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); + + // Single-level map-reduce + exerciseMapCollection(data, + groupingBy(classifier, reducing(0, mDoubler, Integer::sum)), + new GroupingByAssertion<>(classifier, HashMap.class, + new ReducingAssertion<>(0, mDoubler, Integer::sum))); + // with concurrent + exerciseMapCollection(data, + groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)), + new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, + new ReducingAssertion<>(0, mDoubler, Integer::sum))); + + // With explicit constructors + exerciseMapCollection(data, + groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)), + new GroupingByAssertion<>(classifier, TreeMap.class, + new ReducingAssertion<>(0, mDoubler, Integer::sum))); + // with concurrent + exerciseMapCollection(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)), + new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, + new ReducingAssertion<>(0, mDoubler, Integer::sum))); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testSimplePartitioningBy(String name, TestData.OfRef data) throws ReflectiveOperationException { + Predicate classifier = i -> i % 3 == 0; + + // Single-level partition to downstream List + exerciseMapCollection(data, + partitioningBy(classifier), + new PartitioningByAssertion<>(classifier, new ToListAssertion<>())); + exerciseMapCollection(data, + partitioningBy(classifier, toList()), + new PartitioningByAssertion<>(classifier, new ToListAssertion<>())); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testTwoLevelPartitioningBy(String name, TestData.OfRef data) throws ReflectiveOperationException { + Predicate classifier = i -> i % 3 == 0; + Predicate classifier2 = i -> i % 7 == 0; + + // Two level partition + exerciseMapCollection(data, + partitioningBy(classifier, partitioningBy(classifier2)), + new PartitioningByAssertion<>(classifier, + new PartitioningByAssertion(classifier2, new ToListAssertion<>()))); + + // Two level partition with reduce + exerciseMapCollection(data, + partitioningBy(classifier, reducing(0, Integer::sum)), + new PartitioningByAssertion<>(classifier, + new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testComposeFinisher(String name, TestData.OfRef data) throws ReflectiveOperationException { + List asList = exerciseTerminalOps(data, s -> s.collect(toList())); + List asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList))); + assertEquals(asList, asImmutableList); + try { + asImmutableList.add(0); + fail("Expecting immutable result"); + } + catch (UnsupportedOperationException ignored) { } + } + +} diff -r 874d76e4699d -r a2c7a365dde4 jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java --- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java Wed Jul 05 20:22:22 2017 +0200 +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java Fri Feb 27 09:58:25 2015 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -34,8 +34,19 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.stream.*; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.DoubleStreamTestDataProvider; +import java.util.stream.IntStream; +import java.util.stream.IntStreamTestDataProvider; +import java.util.stream.LongStream; +import java.util.stream.LongStreamTestDataProvider; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; +import java.util.stream.StreamTestDataProvider; +import java.util.stream.TestData; import static java.util.stream.LambdaTestHelpers.*; import static java.util.stream.ThowableHelper.checkNPE; @@ -66,6 +77,59 @@ exerciseOps(TestData.Factory.ofArray("LONG_STRING", new String[] {LONG_STRING}), s -> s.flatMap(flattenChars)); } + @Test + public void testClose() { + AtomicInteger before = new AtomicInteger(); + AtomicInteger onClose = new AtomicInteger(); + + Supplier> s = () -> { + before.set(0); onClose.set(0); + return Stream.of(1, 2).peek(e -> before.getAndIncrement()); + }; + + s.get().flatMap(i -> Stream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + + s.get().flatMapToInt(i -> IntStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + + s.get().flatMapToLong(i -> LongStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + + s.get().flatMapToDouble(i -> DoubleStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + } + + @Test + public void testIntClose() { + AtomicInteger before = new AtomicInteger(); + AtomicInteger onClose = new AtomicInteger(); + + IntStream.of(1, 2).peek(e -> before.getAndIncrement()). + flatMap(i -> IntStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + } + + @Test + public void testLongClose() { + AtomicInteger before = new AtomicInteger(); + AtomicInteger onClose = new AtomicInteger(); + + LongStream.of(1, 2).peek(e -> before.getAndIncrement()). + flatMap(i -> LongStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + } + + @Test + public void testDoubleClose() { + AtomicInteger before = new AtomicInteger(); + AtomicInteger onClose = new AtomicInteger(); + + DoubleStream.of(1, 2).peek(e -> before.getAndIncrement()). + flatMap(i -> DoubleStream.of(i, i).onClose(onClose::getAndIncrement)).count(); + assertEquals(before.get(), onClose.get()); + } + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, TestData.OfRef data) { Collection result = exerciseOps(data, s -> s.flatMap(mfId)); diff -r 874d76e4699d -r a2c7a365dde4 jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/TabulatorsTest.java --- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/TabulatorsTest.java Wed Jul 05 20:22:22 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,621 +0,0 @@ -/* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package org.openjdk.tests.java.util.stream; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.StringJoiner; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collector; -import java.util.stream.Collectors; -import java.util.stream.LambdaTestHelpers; -import java.util.stream.OpTestCase; -import java.util.stream.Stream; -import java.util.stream.StreamOpFlagTestHelper; -import java.util.stream.StreamTestDataProvider; -import java.util.stream.TestData; - -import org.testng.annotations.Test; - -import static java.util.stream.Collectors.collectingAndThen; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.groupingByConcurrent; -import static java.util.stream.Collectors.partitioningBy; -import static java.util.stream.Collectors.reducing; -import static java.util.stream.Collectors.toCollection; -import static java.util.stream.Collectors.toConcurrentMap; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; -import static java.util.stream.Collectors.toSet; -import static java.util.stream.LambdaTestHelpers.assertContents; -import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; -import static java.util.stream.LambdaTestHelpers.mDoubler; - -/** - * TabulatorsTest - * - * @author Brian Goetz - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class TabulatorsTest extends OpTestCase { - - private static abstract class TabulationAssertion { - abstract void assertValue(U value, - Supplier> source, - boolean ordered) throws ReflectiveOperationException; - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - static class GroupedMapAssertion> extends TabulationAssertion { - private final Class clazz; - private final Function classifier; - private final TabulationAssertion downstream; - - protected GroupedMapAssertion(Function classifier, - Class clazz, - TabulationAssertion downstream) { - this.clazz = clazz; - this.classifier = classifier; - this.downstream = downstream; - } - - void assertValue(M map, - Supplier> source, - boolean ordered) throws ReflectiveOperationException { - if (!clazz.isAssignableFrom(map.getClass())) - fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass())); - assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet())); - for (Map.Entry entry : map.entrySet()) { - K key = entry.getKey(); - downstream.assertValue(entry.getValue(), - () -> source.get().filter(e -> classifier.apply(e).equals(key)), - ordered); - } - } - } - - static class ToMapAssertion> extends TabulationAssertion { - private final Class clazz; - private final Function keyFn; - private final Function valueFn; - private final BinaryOperator mergeFn; - - ToMapAssertion(Function keyFn, - Function valueFn, - BinaryOperator mergeFn, - Class clazz) { - this.clazz = clazz; - this.keyFn = keyFn; - this.valueFn = valueFn; - this.mergeFn = mergeFn; - } - - @Override - void assertValue(M map, Supplier> source, boolean ordered) throws ReflectiveOperationException { - Set uniqueKeys = source.get().map(keyFn).collect(toSet()); - assertTrue(clazz.isAssignableFrom(map.getClass())); - assertEquals(uniqueKeys, map.keySet()); - source.get().forEach(t -> { - K key = keyFn.apply(t); - V v = source.get() - .filter(e -> key.equals(keyFn.apply(e))) - .map(valueFn) - .reduce(mergeFn) - .get(); - assertEquals(map.get(key), v); - }); - } - } - - static class PartitionAssertion extends TabulationAssertion> { - private final Predicate predicate; - private final TabulationAssertion downstream; - - protected PartitionAssertion(Predicate predicate, - TabulationAssertion downstream) { - this.predicate = predicate; - this.downstream = downstream; - } - - void assertValue(Map map, - Supplier> source, - boolean ordered) throws ReflectiveOperationException { - if (!Map.class.isAssignableFrom(map.getClass())) - fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass())); - assertEquals(2, map.size()); - downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered); - downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered); - } - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - static class ListAssertion extends TabulationAssertion> { - @Override - void assertValue(List value, Supplier> source, boolean ordered) - throws ReflectiveOperationException { - if (!List.class.isAssignableFrom(value.getClass())) - fail(String.format("Class mismatch in ListAssertion: %s", value.getClass())); - Stream stream = source.get(); - List result = new ArrayList<>(); - for (Iterator it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add - result.add(it.next()); - if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered) - assertContents(value, result); - else - assertContentsUnordered(value, result); - } - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - static class CollectionAssertion extends TabulationAssertion> { - private final Class clazz; - private final boolean targetOrdered; - - protected CollectionAssertion(Class clazz, boolean targetOrdered) { - this.clazz = clazz; - this.targetOrdered = targetOrdered; - } - - @Override - void assertValue(Collection value, Supplier> source, boolean ordered) - throws ReflectiveOperationException { - if (!clazz.isAssignableFrom(value.getClass())) - fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass())); - Stream stream = source.get(); - Collection result = clazz.newInstance(); - for (Iterator it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add - result.add(it.next()); - if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered) - assertContents(value, result); - else - assertContentsUnordered(value, result); - } - } - - static class ReduceAssertion extends TabulationAssertion { - private final U identity; - private final Function mapper; - private final BinaryOperator reducer; - - ReduceAssertion(U identity, Function mapper, BinaryOperator reducer) { - this.identity = identity; - this.mapper = mapper; - this.reducer = reducer; - } - - @Override - void assertValue(U value, Supplier> source, boolean ordered) - throws ReflectiveOperationException { - Optional reduced = source.get().map(mapper).reduce(reducer); - if (value == null) - assertTrue(!reduced.isPresent()); - else if (!reduced.isPresent()) { - assertEquals(value, identity); - } - else { - assertEquals(value, reduced.get()); - } - } - } - - private ResultAsserter mapTabulationAsserter(boolean ordered) { - return (act, exp, ord, par) -> { - if (par && (!ordered || !ord)) { - TabulatorsTest.nestedMapEqualityAssertion(act, exp); - } - else { - LambdaTestHelpers.assertContentsEqual(act, exp); - } - }; - } - - private - void exerciseMapTabulation(TestData> data, - Collector collector, - TabulationAssertion assertion) - throws ReflectiveOperationException { - boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED); - - M m = withData(data) - .terminal(s -> s.collect(collector)) - .resultAsserter(mapTabulationAsserter(ordered)) - .exercise(); - assertion.assertValue(m, () -> data.stream(), ordered); - - m = withData(data) - .terminal(s -> s.unordered().collect(collector)) - .resultAsserter(mapTabulationAsserter(ordered)) - .exercise(); - assertion.assertValue(m, () -> data.stream(), false); - } - - private static void nestedMapEqualityAssertion(Object o1, Object o2) { - if (o1 instanceof Map) { - Map m1 = (Map) o1; - Map m2 = (Map) o2; - assertContentsUnordered(m1.keySet(), m2.keySet()); - for (Object k : m1.keySet()) - nestedMapEqualityAssertion(m1.get(k), m2.get(k)); - } - else if (o1 instanceof Collection) { - assertContentsUnordered(((Collection) o1), ((Collection) o2)); - } - else - assertEquals(o1, o2); - } - - private void assertCollect(TestData.OfRef data, - Collector collector, - Function, R> streamReduction) { - R check = streamReduction.apply(data.stream()); - withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise(); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testReduce(String name, TestData.OfRef data) throws ReflectiveOperationException { - assertCollect(data, Collectors.reducing(0, Integer::sum), - s -> s.reduce(0, Integer::sum)); - assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min), - s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE)); - assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max), - s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE)); - - assertCollect(data, Collectors.reducing(Integer::sum), - s -> s.reduce(Integer::sum)); - assertCollect(data, Collectors.minBy(Comparator.naturalOrder()), - s -> s.min(Integer::compare)); - assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()), - s -> s.max(Integer::compare)); - - assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum), - s -> s.map(x -> x*2).reduce(0, Integer::sum)); - - assertCollect(data, Collectors.summingLong(x -> x * 2L), - s -> s.map(x -> x*2L).reduce(0L, Long::sum)); - assertCollect(data, Collectors.summingInt(x -> x * 2), - s -> s.map(x -> x*2).reduce(0, Integer::sum)); - assertCollect(data, Collectors.summingDouble(x -> x * 2.0d), - s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum)); - - assertCollect(data, Collectors.averagingInt(x -> x * 2), - s -> s.mapToInt(x -> x * 2).average().orElse(0)); - assertCollect(data, Collectors.averagingLong(x -> x * 2), - s -> s.mapToLong(x -> x * 2).average().orElse(0)); - assertCollect(data, Collectors.averagingDouble(x -> x * 2), - s -> s.mapToDouble(x -> x * 2).average().orElse(0)); - - // Test explicit Collector.of - Collector avg2xint = Collector.of(() -> new long[2], - (a, b) -> { - a[0] += b * 2; - a[1]++; - }, - (a, b) -> { - a[0] += b[0]; - a[1] += b[1]; - return a; - }, - a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]); - assertCollect(data, avg2xint, - s -> s.mapToInt(x -> x * 2).average().orElse(0)); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testJoin(String name, TestData.OfRef data) throws ReflectiveOperationException { - withData(data) - .terminal(s -> s.map(Object::toString).collect(Collectors.joining())) - .expectedResult(join(data, "")) - .exercise(); - - Collector likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString); - withData(data) - .terminal(s -> s.map(Object::toString).collect(likeJoining)) - .expectedResult(join(data, "")) - .exercise(); - - withData(data) - .terminal(s -> s.map(Object::toString).collect(Collectors.joining(","))) - .expectedResult(join(data, ",")) - .exercise(); - - withData(data) - .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]"))) - .expectedResult("[" + join(data, ",") + "]") - .exercise(); - - withData(data) - .terminal(s -> s.map(Object::toString) - .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) - .toString()) - .expectedResult(join(data, "")) - .exercise(); - - withData(data) - .terminal(s -> s.map(Object::toString) - .collect(() -> new StringJoiner(","), - (sj, cs) -> sj.add(cs), - (j1, j2) -> j1.merge(j2)) - .toString()) - .expectedResult(join(data, ",")) - .exercise(); - - withData(data) - .terminal(s -> s.map(Object::toString) - .collect(() -> new StringJoiner(",", "[", "]"), - (sj, cs) -> sj.add(cs), - (j1, j2) -> j1.merge(j2)) - .toString()) - .expectedResult("[" + join(data, ",") + "]") - .exercise(); - } - - private String join(TestData.OfRef data, String delim) { - StringBuilder sb = new StringBuilder(); - boolean first = true; - for (T i : data) { - if (!first) - sb.append(delim); - sb.append(i.toString()); - first = false; - } - return sb.toString(); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testSimpleToMap(String name, TestData.OfRef data) throws ReflectiveOperationException { - Function keyFn = i -> i * 2; - Function valueFn = i -> i * 4; - - List dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new)); - Set dataAsSet = new HashSet<>(dataAsList); - - BinaryOperator sum = Integer::sum; - for (BinaryOperator op : Arrays.asList((u, v) -> u, - (u, v) -> v, - sum)) { - try { - exerciseMapTabulation(data, toMap(keyFn, valueFn), - new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); - if (dataAsList.size() != dataAsSet.size()) - fail("Expected ISE on input with duplicates"); - } - catch (IllegalStateException e) { - if (dataAsList.size() == dataAsSet.size()) - fail("Expected no ISE on input without duplicates"); - } - - exerciseMapTabulation(data, toMap(keyFn, valueFn, op), - new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); - - exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new), - new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class)); - } - - // For concurrent maps, only use commutative merge functions - try { - exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn), - new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); - if (dataAsList.size() != dataAsSet.size()) - fail("Expected ISE on input with duplicates"); - } - catch (IllegalStateException e) { - if (dataAsList.size() == dataAsSet.size()) - fail("Expected no ISE on input without duplicates"); - } - - exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum), - new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); - - exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new), - new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class)); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testSimpleGroupBy(String name, TestData.OfRef data) throws ReflectiveOperationException { - Function classifier = i -> i % 3; - - // Single-level groupBy - exerciseMapTabulation(data, groupingBy(classifier), - new GroupedMapAssertion<>(classifier, HashMap.class, - new ListAssertion<>())); - exerciseMapTabulation(data, groupingByConcurrent(classifier), - new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, - new ListAssertion<>())); - - // With explicit constructors - exerciseMapTabulation(data, - groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new CollectionAssertion(HashSet.class, false))); - exerciseMapTabulation(data, - groupingByConcurrent(classifier, ConcurrentSkipListMap::new, - toCollection(HashSet::new)), - new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, - new CollectionAssertion(HashSet.class, false))); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testTwoLevelGroupBy(String name, TestData.OfRef data) throws ReflectiveOperationException { - Function classifier = i -> i % 6; - Function classifier2 = i -> i % 23; - - // Two-level groupBy - exerciseMapTabulation(data, - groupingBy(classifier, groupingBy(classifier2)), - new GroupedMapAssertion<>(classifier, HashMap.class, - new GroupedMapAssertion<>(classifier2, HashMap.class, - new ListAssertion<>()))); - // with concurrent as upstream - exerciseMapTabulation(data, - groupingByConcurrent(classifier, groupingBy(classifier2)), - new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, - new GroupedMapAssertion<>(classifier2, HashMap.class, - new ListAssertion<>()))); - // with concurrent as downstream - exerciseMapTabulation(data, - groupingBy(classifier, groupingByConcurrent(classifier2)), - new GroupedMapAssertion<>(classifier, HashMap.class, - new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, - new ListAssertion<>()))); - // with concurrent as upstream and downstream - exerciseMapTabulation(data, - groupingByConcurrent(classifier, groupingByConcurrent(classifier2)), - new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, - new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, - new ListAssertion<>()))); - - // With explicit constructors - exerciseMapTabulation(data, - groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new GroupedMapAssertion<>(classifier2, TreeMap.class, - new CollectionAssertion(HashSet.class, false)))); - // with concurrent as upstream - exerciseMapTabulation(data, - groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())), - new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, - new GroupedMapAssertion<>(classifier2, TreeMap.class, - new ListAssertion<>()))); - // with concurrent as downstream - exerciseMapTabulation(data, - groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, - new ListAssertion<>()))); - // with concurrent as upstream and downstream - exerciseMapTabulation(data, - groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), - new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, - new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, - new ListAssertion<>()))); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testGroupedReduce(String name, TestData.OfRef data) throws ReflectiveOperationException { - Function classifier = i -> i % 3; - - // Single-level simple reduce - exerciseMapTabulation(data, - groupingBy(classifier, reducing(0, Integer::sum)), - new GroupedMapAssertion<>(classifier, HashMap.class, - new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); - // with concurrent - exerciseMapTabulation(data, - groupingByConcurrent(classifier, reducing(0, Integer::sum)), - new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, - new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); - - // With explicit constructors - exerciseMapTabulation(data, - groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); - // with concurrent - exerciseMapTabulation(data, - groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)), - new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, - new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); - - // Single-level map-reduce - exerciseMapTabulation(data, - groupingBy(classifier, reducing(0, mDoubler, Integer::sum)), - new GroupedMapAssertion<>(classifier, HashMap.class, - new ReduceAssertion<>(0, mDoubler, Integer::sum))); - // with concurrent - exerciseMapTabulation(data, - groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)), - new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, - new ReduceAssertion<>(0, mDoubler, Integer::sum))); - - // With explicit constructors - exerciseMapTabulation(data, - groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new ReduceAssertion<>(0, mDoubler, Integer::sum))); - // with concurrent - exerciseMapTabulation(data, - groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)), - new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, - new ReduceAssertion<>(0, mDoubler, Integer::sum))); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testSimplePartition(String name, TestData.OfRef data) throws ReflectiveOperationException { - Predicate classifier = i -> i % 3 == 0; - - // Single-level partition to downstream List - exerciseMapTabulation(data, - partitioningBy(classifier), - new PartitionAssertion<>(classifier, new ListAssertion<>())); - exerciseMapTabulation(data, - partitioningBy(classifier, toList()), - new PartitionAssertion<>(classifier, new ListAssertion<>())); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testTwoLevelPartition(String name, TestData.OfRef data) throws ReflectiveOperationException { - Predicate classifier = i -> i % 3 == 0; - Predicate classifier2 = i -> i % 7 == 0; - - // Two level partition - exerciseMapTabulation(data, - partitioningBy(classifier, partitioningBy(classifier2)), - new PartitionAssertion<>(classifier, - new PartitionAssertion(classifier2, new ListAssertion<>()))); - - // Two level partition with reduce - exerciseMapTabulation(data, - partitioningBy(classifier, reducing(0, Integer::sum)), - new PartitionAssertion<>(classifier, - new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); - } - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testComposeFinisher(String name, TestData.OfRef data) throws ReflectiveOperationException { - List asList = exerciseTerminalOps(data, s -> s.collect(toList())); - List asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList))); - assertEquals(asList, asImmutableList); - try { - asImmutableList.add(0); - fail("Expecting immutable result"); - } - catch (UnsupportedOperationException ignored) { } - } - -}