jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java
changeset 33922 edbb9a685407
parent 33921 935f3f7e43b0
parent 33903 393dc9cd55b6
child 33923 25a2cab05cfb
equal deleted inserted replaced
33921:935f3f7e43b0 33922:edbb9a685407
     1 /*
       
     2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 package java.util.stream;
       
    24 
       
    25 import java.io.PrintWriter;
       
    26 import java.io.StringWriter;
       
    27 import java.util.ArrayList;
       
    28 import java.util.Arrays;
       
    29 import java.util.Collection;
       
    30 import java.util.Collections;
       
    31 import java.util.EnumMap;
       
    32 import java.util.EnumSet;
       
    33 import java.util.HashMap;
       
    34 import java.util.HashSet;
       
    35 import java.util.List;
       
    36 import java.util.Map;
       
    37 import java.util.Objects;
       
    38 import java.util.Set;
       
    39 import java.util.Spliterator;
       
    40 import java.util.function.BiConsumer;
       
    41 import java.util.function.Consumer;
       
    42 import java.util.function.Function;
       
    43 
       
    44 import org.testng.annotations.Test;
       
    45 
       
    46 /**
       
    47  * Base class for streams test cases.  Provides 'exercise' methods for taking
       
    48  * lambdas that construct and modify streams, and evaluates them in different
       
    49  * ways and asserts that they produce equivalent results.
       
    50  */
       
    51 @Test
       
    52 public abstract class OpTestCase extends LoggingTestCase {
       
    53 
       
    54     private final Map<StreamShape, Set<? extends BaseStreamTestScenario>> testScenarios;
       
    55 
       
    56     protected OpTestCase() {
       
    57         testScenarios = new EnumMap<>(StreamShape.class);
       
    58         testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class)));
       
    59         testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class)));
       
    60         testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class)));
       
    61         testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class)));
       
    62     }
       
    63 
       
    64     @SuppressWarnings("rawtypes")
       
    65     public static int getStreamFlags(BaseStream s) {
       
    66         return ((AbstractPipeline) s).getStreamFlags();
       
    67     }
       
    68 
       
    69     /**
       
    70      * An asserter for results produced when exercising of stream or terminal
       
    71      * tests.
       
    72      *
       
    73      * @param <R> the type of result to assert on
       
    74      */
       
    75     public interface ResultAsserter<R> {
       
    76         /**
       
    77          * Assert a result produced when exercising of stream or terminal
       
    78          * test.
       
    79          *
       
    80          * @param actual the actual result
       
    81          * @param expected the expected result
       
    82          * @param isOrdered true if the pipeline is ordered
       
    83          * @param isParallel true if the pipeline is parallel
       
    84          */
       
    85         void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel);
       
    86     }
       
    87 
       
    88     // Exercise stream operations
       
    89 
       
    90     public interface BaseStreamTestScenario {
       
    91         StreamShape getShape();
       
    92 
       
    93         boolean isParallel();
       
    94 
       
    95         boolean isOrdered();
       
    96 
       
    97         default <T, S_IN extends BaseStream<T, S_IN>>
       
    98         S_IN getStream(TestData<T, S_IN> data) {
       
    99             return isParallel()
       
   100                    ? data.parallelStream()
       
   101                    : data.stream();
       
   102         }
       
   103 
       
   104         <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
       
   105         void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
       
   106     }
       
   107 
       
   108     protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
       
   109     Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
       
   110         return withData(data).stream(m).exercise();
       
   111     }
       
   112 
       
   113     // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
       
   114     // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
       
   115     @SafeVarargs
       
   116     protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
       
   117     Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
       
   118                                    Function<S_IN, S_OUT>... ms) {
       
   119         Collection<U> result = null;
       
   120         for (Function<S_IN, S_OUT> m : ms) {
       
   121             if (result == null)
       
   122                 result = withData(data).stream(m).exercise();
       
   123             else {
       
   124                 Collection<U> r2 = withData(data).stream(m).exercise();
       
   125                 assertEquals(result, r2);
       
   126             }
       
   127         }
       
   128         return result;
       
   129     }
       
   130 
       
   131     // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
       
   132     // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
       
   133     // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
       
   134     protected final
       
   135     Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
       
   136                                        Function<Stream<Integer>, Stream<Integer>> mRef,
       
   137                                        Function<IntStream, IntStream> mInt,
       
   138                                        Function<LongStream, LongStream> mLong,
       
   139                                        Function<DoubleStream, DoubleStream> mDouble) {
       
   140         @SuppressWarnings({ "rawtypes", "unchecked" })
       
   141         Function<Stream<Integer>, Stream<Integer>>[] ms = new Function[4];
       
   142         ms[0] = mRef;
       
   143         ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e);
       
   144         ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e);
       
   145         ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e);
       
   146         return exerciseOpsMulti(data, ms);
       
   147     }
       
   148 
       
   149     // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
       
   150     // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
       
   151     protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
       
   152     void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
       
   153                                   R expected,
       
   154                                   Map<String, Function<S_IN, S_OUT>> streams,
       
   155                                   Map<String, Function<S_OUT, R>> terminals) {
       
   156         for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
       
   157             setContext("Intermediate stream", se.getKey());
       
   158             for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
       
   159                 setContext("Terminal stream", te.getKey());
       
   160                 withData(data)
       
   161                         .terminal(se.getValue(), te.getValue())
       
   162                         .expectedResult(expected)
       
   163                         .exercise();
       
   164 
       
   165             }
       
   166         }
       
   167     }
       
   168 
       
   169     // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
       
   170     // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
       
   171     // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
       
   172     protected final
       
   173     void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
       
   174                                 Collection<Integer> expected,
       
   175                                 String desc,
       
   176                                 Function<Stream<Integer>, Stream<Integer>> mRef,
       
   177                                 Function<IntStream, IntStream> mInt,
       
   178                                 Function<LongStream, LongStream> mLong,
       
   179                                 Function<DoubleStream, DoubleStream> mDouble,
       
   180                                 Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
       
   181 
       
   182         Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
       
   183         m.put("Ref " + desc, mRef);
       
   184         m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
       
   185         m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
       
   186         m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
       
   187 
       
   188         exerciseTerminalOpsMulti(data, expected, m, terminals);
       
   189     }
       
   190 
       
   191 
       
   192     protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
       
   193     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
       
   194         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
       
   195         return withData(data1).stream(m).exercise();
       
   196     }
       
   197 
       
   198     protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
       
   199     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
       
   200         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
       
   201         return withData(data1).stream(m).expectedResult(expected).exercise();
       
   202     }
       
   203 
       
   204     @SuppressWarnings("unchecked")
       
   205     protected <U, S_OUT extends BaseStream<U, S_OUT>>
       
   206     Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
       
   207         return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
       
   208     }
       
   209 
       
   210     protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
       
   211         TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
       
   212         return withData(data1).stream(m).expectedResult(expected).exercise();
       
   213     }
       
   214 
       
   215     protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
       
   216         Objects.requireNonNull(data);
       
   217         return new DataStreamBuilder<>(data);
       
   218     }
       
   219 
       
   220     @SuppressWarnings({"rawtypes", "unchecked"})
       
   221     public class DataStreamBuilder<T, S_IN extends BaseStream<T, S_IN>> {
       
   222         final TestData<T, S_IN> data;
       
   223 
       
   224         private DataStreamBuilder(TestData<T, S_IN> data) {
       
   225             this.data = Objects.requireNonNull(data);
       
   226         }
       
   227 
       
   228         public <U, S_OUT extends BaseStream<U, S_OUT>>
       
   229         ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) {
       
   230             return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops));
       
   231         }
       
   232 
       
   233         public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
       
   234         stream(Function<S_IN, S_OUT> m) {
       
   235             return new ExerciseDataStreamBuilder<>(data, m);
       
   236         }
       
   237 
       
   238         public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
       
   239         stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) {
       
   240             return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp));
       
   241         }
       
   242 
       
   243         public <R> ExerciseDataTerminalBuilder<T, T, R, S_IN, S_IN>
       
   244         terminal(Function<S_IN, R> terminalF) {
       
   245             return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF);
       
   246         }
       
   247 
       
   248         public <U, R, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT>
       
   249         terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
       
   250             return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF);
       
   251         }
       
   252     }
       
   253 
       
   254     @SuppressWarnings({"rawtypes", "unchecked"})
       
   255     public class ExerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
       
   256         final TestData<T, S_IN> data;
       
   257         final Function<S_IN, S_OUT> m;
       
   258         final StreamShape shape;
       
   259 
       
   260         Set<BaseStreamTestScenario> testSet = new HashSet<>();
       
   261 
       
   262         Collection<U> refResult;
       
   263 
       
   264         Consumer<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty;
       
   265 
       
   266         Consumer<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty;
       
   267 
       
   268         ResultAsserter<Iterable<U>> resultAsserter = (act, exp, ord, par) -> {
       
   269             if (par & !ord) {
       
   270                 LambdaTestHelpers.assertContentsUnordered(act, exp);
       
   271             }
       
   272             else {
       
   273                 LambdaTestHelpers.assertContentsEqual(act, exp);
       
   274             }
       
   275         };
       
   276 
       
   277         private ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
       
   278             this.data = data;
       
   279 
       
   280             this.m = Objects.requireNonNull(m);
       
   281 
       
   282             this.shape = ((AbstractPipeline<?, U, ?>) m.apply(data.stream())).getOutputShape();
       
   283 
       
   284             // Have to initiate from the output shape of the last stream
       
   285             // This means the stream mapper is required first rather than last
       
   286             testSet.addAll(testScenarios.get(shape));
       
   287         }
       
   288 
       
   289         //
       
   290 
       
   291         public <I extends Iterable<U>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) {
       
   292             List<U> l = new ArrayList<>();
       
   293             expectedResult.forEach(l::add);
       
   294             refResult = l;
       
   295             return this;
       
   296         }
       
   297 
       
   298         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(int[] expectedResult) {
       
   299             List l = new ArrayList();
       
   300             for (int anExpectedResult : expectedResult) {
       
   301                 l.add(anExpectedResult);
       
   302             }
       
   303             refResult = l;
       
   304             return this;
       
   305         }
       
   306 
       
   307         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(long[] expectedResult) {
       
   308             List l = new ArrayList();
       
   309             for (long anExpectedResult : expectedResult) {
       
   310                 l.add(anExpectedResult);
       
   311             }
       
   312             refResult = l;
       
   313             return this;
       
   314         }
       
   315 
       
   316         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(double[] expectedResult) {
       
   317             List l = new ArrayList();
       
   318             for (double anExpectedResult : expectedResult) {
       
   319                 l.add(anExpectedResult);
       
   320             }
       
   321             refResult = l;
       
   322             return this;
       
   323         }
       
   324 
       
   325         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> before(Consumer<TestData<T, S_IN>> before) {
       
   326             this.before = Objects.requireNonNull(before);
       
   327             return this;
       
   328         }
       
   329 
       
   330         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> after(Consumer<TestData<T, S_IN>> after) {
       
   331             this.after = Objects.requireNonNull(after);
       
   332             return this;
       
   333         }
       
   334 
       
   335         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(BaseStreamTestScenario... tests) {
       
   336             return without(Arrays.asList(tests));
       
   337         }
       
   338 
       
   339         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(Collection<? extends BaseStreamTestScenario> tests) {
       
   340             for (BaseStreamTestScenario ts : tests) {
       
   341                 if (ts.getShape() == shape) {
       
   342                     testSet.remove(ts);
       
   343                 }
       
   344             }
       
   345 
       
   346             if (testSet.isEmpty()) {
       
   347                 throw new IllegalStateException("Test scenario set is empty");
       
   348             }
       
   349 
       
   350             return this;
       
   351         }
       
   352 
       
   353         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(BaseStreamTestScenario... tests) {
       
   354             return with(Arrays.asList(tests));
       
   355         }
       
   356 
       
   357         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(Collection<? extends BaseStreamTestScenario> tests) {
       
   358             testSet = new HashSet<>();
       
   359 
       
   360             for (BaseStreamTestScenario ts : tests) {
       
   361                 if (ts.getShape() == shape) {
       
   362                     testSet.add(ts);
       
   363                 }
       
   364             }
       
   365 
       
   366             if (testSet.isEmpty()) {
       
   367                 throw new IllegalStateException("Test scenario set is empty");
       
   368             }
       
   369 
       
   370             return this;
       
   371         }
       
   372 
       
   373         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> resultAsserter(ResultAsserter<Iterable<U>> resultAsserter) {
       
   374             this.resultAsserter = resultAsserter;
       
   375             return this;
       
   376         }
       
   377 
       
   378         // Build method
       
   379 
       
   380         public Collection<U> exercise() {
       
   381             final boolean isStreamOrdered;
       
   382             if (refResult == null) {
       
   383                 // Induce the reference result
       
   384                 before.accept(data);
       
   385                 try (S_OUT sOut = m.apply(data.stream())) {
       
   386                     isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
       
   387                     Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
       
   388                     refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
       
   389                 }
       
   390                 after.accept(data);
       
   391             }
       
   392             else {
       
   393                 try (S_OUT sOut = m.apply(data.stream())) {
       
   394                     isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
       
   395                 }
       
   396             }
       
   397 
       
   398             List<Error> errors = new ArrayList<>();
       
   399             for (BaseStreamTestScenario test : testSet) {
       
   400                 try {
       
   401                     before.accept(data);
       
   402 
       
   403                     List<U> result = new ArrayList<>();
       
   404                     test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
       
   405 
       
   406                     Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
       
   407 
       
   408                     if (refResult.size() > 1000) {
       
   409                         LambdaTestHelpers.launderAssertion(
       
   410                                 asserter,
       
   411                                 () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size()));
       
   412                     }
       
   413                     else {
       
   414                         LambdaTestHelpers.launderAssertion(
       
   415                                 asserter,
       
   416                                 () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult));
       
   417                     }
       
   418 
       
   419                     after.accept(data);
       
   420                 } catch (Throwable t) {
       
   421                     errors.add(new Error(String.format("%s: %s", test, t), t));
       
   422                 }
       
   423             }
       
   424 
       
   425             if (!errors.isEmpty()) {
       
   426                 StringBuilder sb = new StringBuilder();
       
   427                 int i = 1;
       
   428                 for (Error t : errors) {
       
   429                     sb.append(i++).append(": ");
       
   430                     if (t instanceof AssertionError) {
       
   431                         sb.append(t).append("\n");
       
   432                     }
       
   433                     else {
       
   434                         StringWriter sw = new StringWriter();
       
   435                         PrintWriter pw = new PrintWriter(sw);
       
   436 
       
   437                         t.getCause().printStackTrace(pw);
       
   438                         pw.flush();
       
   439                         sb.append(t).append("\n").append(sw);
       
   440                     }
       
   441                 }
       
   442                 sb.append("--");
       
   443 
       
   444                 fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb));
       
   445             }
       
   446 
       
   447             return refResult;
       
   448         }
       
   449     }
       
   450 
       
   451     // Exercise terminal operations
       
   452 
       
   453     interface BaseTerminalTestScenario<U, R, S_OUT extends BaseStream<U, S_OUT>> {
       
   454         boolean requiresSingleStageSource();
       
   455 
       
   456         boolean requiresParallelSource();
       
   457 
       
   458         default R run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape) {
       
   459             return terminalF.apply(source);
       
   460         }
       
   461     }
       
   462 
       
   463     @SuppressWarnings({"rawtypes", "unchecked"})
       
   464     enum TerminalTestScenario implements BaseTerminalTestScenario {
       
   465         SINGLE_SEQUENTIAL(true, false),
       
   466 
       
   467         SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
       
   468             @Override
       
   469             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
       
   470                 source = (BaseStream) chain(source, new ShortCircuitOp(shape));
       
   471                 return terminalF.apply(source);
       
   472             }
       
   473         },
       
   474 
       
   475         SINGLE_PARALLEL(true, true),
       
   476 
       
   477         ALL_SEQUENTIAL(false, false),
       
   478 
       
   479         ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) {
       
   480             @Override
       
   481             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
       
   482                 source = (BaseStream) chain(source, new ShortCircuitOp(shape));
       
   483                 return terminalF.apply(source);
       
   484             }
       
   485         },
       
   486 
       
   487         ALL_PARALLEL(false, true),
       
   488 
       
   489         ALL_PARALLEL_SEQUENTIAL(false, false) {
       
   490             @Override
       
   491             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
       
   492                 return terminalF.apply(source.sequential());
       
   493             }
       
   494         },
       
   495         ;
       
   496 
       
   497         private final boolean requiresSingleStageSource;
       
   498         private final boolean isParallel;
       
   499 
       
   500         TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) {
       
   501             this.requiresSingleStageSource = requiresSingleStageSource;
       
   502             this.isParallel = isParallel;
       
   503         }
       
   504 
       
   505         @Override
       
   506         public boolean requiresSingleStageSource() {
       
   507             return requiresSingleStageSource;
       
   508         }
       
   509 
       
   510         @Override
       
   511         public boolean requiresParallelSource() {
       
   512             return isParallel;
       
   513         }
       
   514 
       
   515     }
       
   516 
       
   517     @SuppressWarnings({"rawtypes", "unchecked"})
       
   518     public class ExerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
       
   519         final TestData<T, S_IN> data;
       
   520         final Function<S_IN, S_OUT> streamF;
       
   521         final Function<S_OUT, R> terminalF;
       
   522 
       
   523         R refResult;
       
   524 
       
   525         ResultAsserter<R> resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp);
       
   526 
       
   527         private ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
       
   528             this.data = data;
       
   529             this.streamF = Objects.requireNonNull(streamF);
       
   530             this.terminalF = Objects.requireNonNull(terminalF);
       
   531         }
       
   532 
       
   533         //
       
   534 
       
   535         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) {
       
   536             this.refResult = expectedResult;
       
   537             return this;
       
   538         }
       
   539 
       
   540         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiConsumer<R, R> equalityAsserter) {
       
   541             resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp);
       
   542             return this;
       
   543         }
       
   544 
       
   545         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> resultAsserter(ResultAsserter<R> resultAsserter) {
       
   546             this.resultAsserter = resultAsserter;
       
   547             return this;
       
   548         }
       
   549 
       
   550         // Build method
       
   551 
       
   552         public R exercise() {
       
   553             boolean isOrdered;
       
   554             StreamShape shape;
       
   555             Node<U> node;
       
   556             try (S_OUT out = streamF.apply(data.stream()).sequential()) {
       
   557                 AbstractPipeline ap = (AbstractPipeline) out;
       
   558                 isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
       
   559                 shape = ap.getOutputShape();
       
   560                 // Sequentially collect the output that will be input to the terminal op
       
   561                 node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
       
   562             }
       
   563 
       
   564             EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
       
   565             if (refResult == null) {
       
   566                 // Induce the reference result
       
   567                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
       
   568                                                       StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
       
   569                                                       false);
       
   570 
       
   571                 refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape);
       
   572                 tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL);
       
   573             }
       
   574 
       
   575             for (BaseTerminalTestScenario test : tests) {
       
   576                 S_OUT source;
       
   577                 if (test.requiresSingleStageSource()) {
       
   578                     source = (S_OUT) createPipeline(shape, node.spliterator(),
       
   579                                                     StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
       
   580                                                     test.requiresParallelSource());
       
   581                 }
       
   582                 else {
       
   583                     source = streamF.apply(test.requiresParallelSource()
       
   584                                            ? data.parallelStream() : data.stream());
       
   585                 }
       
   586 
       
   587                 R result;
       
   588                 try (source) {
       
   589                     result = (R) test.run(terminalF, source, shape);
       
   590                 }
       
   591                 LambdaTestHelpers.launderAssertion(
       
   592                         () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
       
   593                         () -> String.format("%s: %s != %s", test, refResult, result));
       
   594             }
       
   595 
       
   596             return refResult;
       
   597         }
       
   598 
       
   599         AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) {
       
   600             switch (shape) {
       
   601                 case REFERENCE:    return new ReferencePipeline.Head<>(s, flags, parallel);
       
   602                 case INT_VALUE:    return new IntPipeline.Head(s, flags, parallel);
       
   603                 case LONG_VALUE:   return new LongPipeline.Head(s, flags, parallel);
       
   604                 case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel);
       
   605                 default: throw new IllegalStateException("Unknown shape: " + shape);
       
   606             }
       
   607         }
       
   608     }
       
   609 
       
   610     protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
       
   611         TestData.OfRef<T> data1
       
   612                 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
       
   613         return withData(data1).terminal(m).expectedResult(expected).exercise();
       
   614     }
       
   615 
       
   616     protected <T, R, S_IN extends BaseStream<T, S_IN>> R
       
   617     exerciseTerminalOps(TestData<T, S_IN> data,
       
   618                         Function<S_IN, R> terminalF) {
       
   619         return withData(data).terminal(terminalF).exercise();
       
   620     }
       
   621 
       
   622     protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
       
   623     exerciseTerminalOps(TestData<T, S_IN> data,
       
   624                         Function<S_IN, S_OUT> streamF,
       
   625                         Function<S_OUT, R> terminalF) {
       
   626         return withData(data).terminal(streamF, terminalF).exercise();
       
   627     }
       
   628 
       
   629     //
       
   630 
       
   631     @SuppressWarnings({"rawtypes", "unchecked"})
       
   632     private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) {
       
   633         return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op);
       
   634     }
       
   635 
       
   636     @SuppressWarnings({"rawtypes", "unchecked"})
       
   637     private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) {
       
   638         for (IntermediateTestOp op : ops)
       
   639             pipe = chain(pipe, op);
       
   640         return pipe;
       
   641     }
       
   642 
       
   643     @SuppressWarnings("rawtypes")
       
   644     private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) {
       
   645         return chain((AbstractPipeline) pipe, op);
       
   646     }
       
   647 
       
   648     @SuppressWarnings("rawtypes")
       
   649     public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) {
       
   650         return chain((AbstractPipeline) pipe, ops);
       
   651     }
       
   652 
       
   653     // Test data
       
   654 
       
   655     static class ShortCircuitOp<T> implements StatelessTestOp<T,T> {
       
   656         private final StreamShape shape;
       
   657 
       
   658         ShortCircuitOp(StreamShape shape) {
       
   659             this.shape = shape;
       
   660         }
       
   661 
       
   662         @Override
       
   663         public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
       
   664             return sink;
       
   665         }
       
   666 
       
   667         @Override
       
   668         public int opGetFlags() {
       
   669             return StreamOpFlag.IS_SHORT_CIRCUIT;
       
   670         }
       
   671 
       
   672         @Override
       
   673         public StreamShape outputShape() {
       
   674             return shape;
       
   675         }
       
   676 
       
   677         @Override
       
   678         public StreamShape inputShape() {
       
   679             return shape;
       
   680         }
       
   681     }
       
   682 }