8129120: Terminal operation properties should not be back-propagated to upstream operations
Reviewed-by: briangoetz, chegar
--- a/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java Tue Jun 23 09:49:55 2015 +0200
@@ -249,6 +249,11 @@
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
+ // Set the depth of this, last, pipeline stage to zero to slice the
+ // pipeline such that this operation will not be included in the
+ // upstream slice and upstream operations will not be included
+ // in this slice
+ depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
@@ -402,47 +407,19 @@
throw new IllegalStateException(MSG_CONSUMED);
}
- boolean hasTerminalFlags = terminalFlags != 0;
if (isParallel() && sourceStage.sourceAnyStateful) {
- // Adjust pipeline stages if there are stateful ops,
- // and find the last short circuiting op, if any, that
- // defines the head stage for back-propagation of terminal flags
- @SuppressWarnings("rawtypes")
- AbstractPipeline backPropagationHead = sourceStage;
+ // Adapt the source spliterator, evaluating each stateful op
+ // in the pipeline up to and including this pipeline stage.
+ // The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
- for (@SuppressWarnings("rawtypes") AbstractPipeline p = sourceStage.nextStage;
- p != null;
- p = p.nextStage) {
- if (p.opIsStateful()) {
- if (StreamOpFlag.SHORT_CIRCUIT.isKnown(p.sourceOrOpFlags)) {
- // If the stateful operation is a short-circuit operation
- // then move the back propagation head forwards
- // NOTE: there are no size-injecting ops
- backPropagationHead = p;
- }
-
- depth = 0;
- }
- p.depth = depth++;
- }
-
- // Adapt the source spliterator, evaluating each stateful op
- // in the pipeline up to and including this pipeline stage
- // Flags for each pipeline stage are adjusted accordingly
- boolean backPropagate = false;
- int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
- if (hasTerminalFlags &&
- (backPropagate || (backPropagate = (u == backPropagationHead)))) {
- // Back-propagate flags from the terminal operation
- u.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, u.combinedFlags);
- }
-
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
+ depth = 0;
+
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
@@ -460,11 +437,12 @@
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
+ p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
- if (hasTerminalFlags) {
+ if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
@@ -472,7 +450,6 @@
return spliterator;
}
-
// PipelineHelper
@Override
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
*/
package java.util.stream;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.PrimitiveIterator;
+import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
@@ -159,12 +162,50 @@
for (double t : pipe2.toArray())
b.accept(t);
}
- },;
+ },
+
+ // Wrap as parallel stream + forEach synchronizing
+ PAR_STREAM_FOR_EACH(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+ m.apply(data.parallelStream()).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+
+ // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+ PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+ S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+ new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+ m.apply(pipe1).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+ ;
+
+ // The set of scenarios that clean the SIZED flag
+ public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+ EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private boolean isParallel;
+ private final boolean isOrdered;
+
DoubleStreamTestScenario(boolean isParallel) {
+ this(isParallel, true);
+ }
+
+ DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
+ this.isOrdered = isOrdered;
}
public StreamShape getShape() {
@@ -175,6 +216,10 @@
return isParallel;
}
+ public boolean isOrdered() {
+ return isOrdered;
+ }
+
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
*/
package java.util.stream;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.PrimitiveIterator;
+import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -160,12 +163,50 @@
for (int t : pipe2.toArray())
b.accept(t);
}
- },;
+ },
+
+ // Wrap as parallel stream + forEach synchronizing
+ PAR_STREAM_FOR_EACH(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
+ m.apply(data.parallelStream()).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
- private boolean isParallel;
+ // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+ PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
+ S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+ new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+ m.apply(pipe1).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+ ;
+
+ // The set of scenarios that clean the SIZED flag
+ public static final Set<IntStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+ EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
+
+ private final boolean isParallel;
+
+ private final boolean isOrdered;
IntStreamTestScenario(boolean isParallel) {
+ this(isParallel, true);
+ }
+
+ IntStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
+ this.isOrdered = isOrdered;
}
public StreamShape getShape() {
@@ -176,6 +217,10 @@
return isParallel;
}
+ public boolean isOrdered() {
+ return isOrdered;
+ }
+
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
*/
package java.util.stream;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.PrimitiveIterator;
+import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -159,12 +162,50 @@
for (long t : pipe2.toArray())
b.accept(t);
}
- },;
+ },
+
+ // Wrap as parallel stream + forEach synchronizing
+ PAR_STREAM_FOR_EACH(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
+ m.apply(data.parallelStream()).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+
+ // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+ PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+ <T, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
+ S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+ new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+ m.apply(pipe1).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+ ;
+
+ // The set of scenarios that clean the SIZED flag
+ public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+ EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private boolean isParallel;
+ private final boolean isOrdered;
+
LongStreamTestScenario(boolean isParallel) {
+ this(isParallel, true);
+ }
+
+ LongStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
+ this.isOrdered = isOrdered;
}
public StreamShape getShape() {
@@ -175,6 +216,10 @@
return isParallel;
}
+ public boolean isOrdered() {
+ return isOrdered;
+ }
+
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -91,11 +92,13 @@
boolean isParallel();
- abstract <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+ boolean isOrdered();
+
+ <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
}
- public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+ protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
return withData(data).stream(m).exercise();
}
@@ -103,7 +106,7 @@
// Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
// If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
@SafeVarargs
- public final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+ protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
Function<S_IN, S_OUT>... ms) {
Collection<U> result = null;
@@ -121,7 +124,7 @@
// Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
// Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
// lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
- public final
+ protected final
Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
Function<Stream<Integer>, Stream<Integer>> mRef,
Function<IntStream, IntStream> mInt,
@@ -136,30 +139,73 @@
return exerciseOpsMulti(data, ms);
}
- public <T, U, S_OUT extends BaseStream<U, S_OUT>>
+ // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
+ // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
+ protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+ void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
+ R expected,
+ Map<String, Function<S_IN, S_OUT>> streams,
+ Map<String, Function<S_OUT, R>> terminals) {
+ for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
+ setContext("Intermediate stream", se.getKey());
+ for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
+ setContext("Terminal stream", te.getKey());
+ withData(data)
+ .terminal(se.getValue(), te.getValue())
+ .expectedResult(expected)
+ .exercise();
+
+ }
+ }
+ }
+
+ // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
+ // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
+ // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
+ protected final
+ void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
+ Collection<Integer> expected,
+ String desc,
+ Function<Stream<Integer>, Stream<Integer>> mRef,
+ Function<IntStream, IntStream> mInt,
+ Function<LongStream, LongStream> mLong,
+ Function<DoubleStream, DoubleStream> mDouble,
+ Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
+
+ Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
+ m.put("Ref " + desc, mRef);
+ m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
+ m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
+ m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
+
+ exerciseTerminalOpsMulti(data, expected, m, terminals);
+ }
+
+
+ protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).stream(m).exercise();
}
- public <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
+ protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).stream(m).expectedResult(expected).exercise();
}
@SuppressWarnings("unchecked")
- public <U, S_OUT extends BaseStream<U, S_OUT>>
+ protected <U, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
}
- public Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
+ protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
return withData(data1).stream(m).expectedResult(expected).exercise();
}
- public <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
+ protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
Objects.requireNonNull(data);
return new DataStreamBuilder<>(data);
}
@@ -325,19 +371,19 @@
// Build method
public Collection<U> exercise() {
- final boolean isOrdered;
+ final boolean isStreamOrdered;
if (refResult == null) {
// Induce the reference result
before.accept(data);
S_OUT sOut = m.apply(data.stream());
- isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+ isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
after.accept(data);
}
else {
S_OUT sOut = m.apply(data.stream());
- isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+ isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
}
List<Error> errors = new ArrayList<>();
@@ -348,7 +394,7 @@
List<U> result = new ArrayList<>();
test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
- Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel());
+ Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
if (refResult.size() > 1000) {
LambdaTestHelpers.launderAssertion(
@@ -406,7 +452,7 @@
}
@SuppressWarnings({"rawtypes", "unchecked"})
- static enum TerminalTestScenario implements BaseTerminalTestScenario {
+ enum TerminalTestScenario implements BaseTerminalTestScenario {
SINGLE_SEQUENTIAL(true, false),
SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
@@ -546,19 +592,19 @@
}
}
- public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
+ protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
TestData.OfRef<T> data1
= TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).terminal(m).expectedResult(expected).exercise();
}
- public <T, R, S_IN extends BaseStream<T, S_IN>> R
+ protected <T, R, S_IN extends BaseStream<T, S_IN>> R
exerciseTerminalOps(TestData<T, S_IN> data,
Function<S_IN, R> terminalF) {
return withData(data).terminal(terminalF).exercise();
}
- public <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
+ protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
exerciseTerminalOps(TestData<T, S_IN> data,
Function<S_IN, S_OUT> streamF,
Function<S_OUT, R> terminalF) {
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
*/
package java.util.stream;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -173,8 +176,8 @@
}
},
- // Wrap as parallel + collect
- PAR_STREAM_COLLECT(true) {
+ // Wrap as parallel + collect to list
+ PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
@@ -182,8 +185,8 @@
}
},
- // Wrap sequential as parallel, + collect
- STREAM_TO_PAR_STREAM_COLLECT(true) {
+ // Wrap sequential as parallel, + collect to list
+ STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
@@ -192,19 +195,56 @@
},
// Wrap parallel as sequential,, + collect
- PAR_STREAM_TO_STREAM_COLLECT(true) {
+ PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
b.accept(u);
}
},
+
+ // Wrap as parallel stream + forEach synchronizing
+ PAR_STREAM_FOR_EACH(true, false) {
+ <T, U, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+ m.apply(data.parallelStream()).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+
+ // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+ PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+ <T, U, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+ S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+ new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+ m.apply(pipe1).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
;
- private boolean isParallel;
+ // The set of scenarios that clean the SIZED flag
+ public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+ EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
+
+ private final boolean isParallel;
+
+ private final boolean isOrdered;
StreamTestScenario(boolean isParallel) {
+ this(isParallel, true);
+ }
+
+ StreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
+ this.isOrdered = isOrdered;
}
public StreamShape getShape() {
@@ -215,6 +255,10 @@
return isParallel;
}
+ public boolean isOrdered() {
+ return isOrdered;
+ }
+
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, b, (Function<S_IN, Stream<U>>) m);
--- a/jdk/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -112,7 +112,7 @@
FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
withData(data).ops(opsArray).
- without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -152,7 +152,7 @@
withData(data).ops(opsArray).
- without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -185,7 +185,7 @@
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
withData(data).ops(opsArray).
- without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -221,7 +221,7 @@
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
withData(data).ops(opsArray).
- without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
--- a/jdk/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java Mon Jun 22 13:30:21 2015 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,265 +0,0 @@
-/*
- * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import org.testng.annotations.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.UnaryOperator;
-
-@Test
-public class UnorderedTest extends OpTestCase {
-
- @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
- public void testTerminalOps(String name, TestData<Integer, Stream<Integer>> data) {
- testTerminal(data, s -> { s.forEach(x -> { }); return 0; });
-
- testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
-
- testTerminal(data, s -> s.anyMatch(e -> true));
- }
-
-
- private <T, R> void testTerminal(TestData<T, Stream<T>> data, Function<Stream<T>, R> terminalF) {
- testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
- }
-
- static class WrappingUnaryOperator<S> implements UnaryOperator<S> {
-
- final boolean isLimit;
- final UnaryOperator<S> uo;
-
- WrappingUnaryOperator(UnaryOperator<S> uo) {
- this(uo, false);
- }
-
- WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) {
- this.uo = uo;
- this.isLimit = isLimit;
- }
-
- @Override
- public S apply(S s) {
- return uo.apply(s);
- }
- }
-
- static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) {
- return new WrappingUnaryOperator<>(uo);
- }
-
- static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) {
- return new WrappingUnaryOperator<>(uo, isLimit);
- }
-
- @SuppressWarnings("rawtypes")
- private List permutationOfFunctions =
- LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList(
- wrap(s -> s.sorted()),
- wrap(s -> s.distinct()),
- wrap(s -> s.limit(5), true)
- ));
-
- @SuppressWarnings("unchecked")
- private <T, R> void testTerminal(TestData<T, Stream<T>> data,
- Function<Stream<T>, R> terminalF,
- BiConsumer<R, R> equalityAsserter) {
- testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE);
- }
-
- //
-
- @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
- public void testIntTerminalOps(String name, TestData.OfInt data) {
- testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; });
- testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
- testIntTerminal(data, s -> s.anyMatch(e -> true));
- }
-
-
- private <T, R> void testIntTerminal(TestData.OfInt data, Function<IntStream, R> terminalF) {
- testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
- }
-
- private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions =
- LambdaTestHelpers.perm(Arrays.asList(
- wrap(s -> s.sorted()),
- wrap(s -> s.distinct()),
- wrap(s -> s.limit(5), true)
- ));
-
- private <R> void testIntTerminal(TestData.OfInt data,
- Function<IntStream, R> terminalF,
- BiConsumer<R, R> equalityAsserter) {
- testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE);
- }
-
- //
-
- @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
- public void testLongTerminalOps(String name, TestData.OfLong data) {
- testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; });
- testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
- testLongTerminal(data, s -> s.anyMatch(e -> true));
- }
-
-
- private <T, R> void testLongTerminal(TestData.OfLong data, Function<LongStream, R> terminalF) {
- testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
- }
-
- private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions =
- LambdaTestHelpers.perm(Arrays.asList(
- wrap(s -> s.sorted()),
- wrap(s -> s.distinct()),
- wrap(s -> s.limit(5), true)
- ));
-
- private <R> void testLongTerminal(TestData.OfLong data,
- Function<LongStream, R> terminalF,
- BiConsumer<R, R> equalityAsserter) {
- testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE);
- }
-
- //
-
- @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
- public void testDoubleTerminalOps(String name, TestData.OfDouble data) {
- testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; });
- testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
- testDoubleTerminal(data, s -> s.anyMatch(e -> true));
- }
-
-
- private <T, R> void testDoubleTerminal(TestData.OfDouble data, Function<DoubleStream, R> terminalF) {
- testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
- }
-
- private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
- LambdaTestHelpers.perm(Arrays.asList(
- wrap(s -> s.sorted()),
- wrap(s -> s.distinct()),
- wrap(s -> s.limit(5), true)
- ));
-
- private <R> void testDoubleTerminal(TestData.OfDouble data,
- Function<DoubleStream, R> terminalF,
- BiConsumer<R, R> equalityAsserter) {
- testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE);
- }
-
- //
-
- private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data,
- Function<S, R> terminalF,
- BiConsumer<R, R> equalityAsserter,
- List<List<WrappingUnaryOperator<S>>> pFunctions,
- StreamShape shape) {
- CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape);
- for (List<WrappingUnaryOperator<S>> f : pFunctions) {
- @SuppressWarnings("unchecked")
- UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp));
- withData(data).
- terminal(fi, terminalF).
- equalator(equalityAsserter).
- exercise();
- }
-
- CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape);
- for (List<WrappingUnaryOperator<S>> f : pFunctions) {
- @SuppressWarnings("unchecked")
- UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp));
- withData(data).
- terminal(fi, s -> terminalF.apply(s.sequential())).
- equalator(equalityAsserter).
- exercise();
- }
- }
-
- static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> {
- private final StreamShape shape;
-
- CheckClearOrderedOp(StreamShape shape) {
- this.shape = shape;
- }
-
- @Override
- public StreamShape outputShape() {
- return shape;
- }
-
- @Override
- public StreamShape inputShape() {
- return shape;
- }
-
- @Override
- public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
- if (parallel) {
- assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
- }
-
- return sink;
- }
- }
-
- static class CheckSetOrderedOp<T> extends CheckClearOrderedOp<T> {
-
- CheckSetOrderedOp(StreamShape shape) {
- super(shape);
- }
-
- @Override
- public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
- assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
-
- return sink;
- }
- }
-
- private <T, S extends BaseStream<T, S>>
- UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) {
- int l = -1;
- for (int i = 0; i < fs.size(); i++) {
- if (fs.get(i).isLimit) {
- l = i;
- }
- }
-
- final int lastLimitIndex = l;
- return s -> {
- if (lastLimitIndex == -1 && fs.size() > 0)
- s = fi.apply(s);
- for (int i = 0; i < fs.size(); i++) {
- s = fs.get(i).apply(s);
- if (i >= lastLimitIndex) {
- s = fi.apply(s);
- }
- }
- return s;
- };
- }
-}
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -192,7 +192,7 @@
public void testInts(TestData.OfInt data, ResultAsserter<Iterable<Integer>> ra) {
withData(data).
stream(s -> s).
- without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
@@ -276,7 +276,7 @@
public void testLongs(TestData.OfLong data, ResultAsserter<Iterable<Long>> ra) {
withData(data).
stream(s -> s).
- without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
@@ -360,7 +360,7 @@
public void testDoubles(TestData.OfDouble data, ResultAsserter<Iterable<Double>> ra) {
withData(data).
stream(s -> s).
- without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Tue Jun 23 09:49:55 2015 +0200
@@ -32,7 +32,16 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.*;
+import java.util.stream.CollectorOps;
+import java.util.stream.Collectors;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import java.util.stream.StreamTestDataProvider;
+import java.util.stream.TestData;
import static java.util.stream.LambdaTestHelpers.*;
@@ -67,7 +76,12 @@
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testOp(String name, TestData.OfRef<Integer> data) {
- Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);
+ Collection<Integer> result = exerciseOpsInt(
+ data,
+ Stream::distinct,
+ IntStream::distinct,
+ LongStream::distinct,
+ DoubleStream::distinct);
assertUnique(result);
assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0);
@@ -127,9 +141,13 @@
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testDistinctDistinct(String name, TestData.OfRef<Integer> data) {
- Collection<Integer> result = withData(data)
- .stream(s -> s.distinct().distinct())
- .exercise();
+ Collection<Integer> result = exerciseOpsInt(
+ data,
+ s -> s.distinct().distinct(),
+ s -> s.distinct().distinct(),
+ s -> s.distinct().distinct(),
+ s -> s.distinct().distinct());
+
assertUnique(result);
}
@@ -152,4 +170,31 @@
assertUnique(result);
assertSorted(result);
}
+
+ @Test
+ public void testStable() {
+ // Create N instances of Integer all with the same value
+ List<Integer> input = IntStream.rangeClosed(0, 1000)
+ .mapToObj(i -> new Integer(1000)) // explicit construction
+ .collect(Collectors.toList());
+ Integer expectedElement = input.get(0);
+ TestData<Integer, Stream<Integer>> data = TestData.Factory.ofCollection(
+ "1000 instances of Integer with the same value", input);
+
+ withData(data)
+ .stream(Stream::distinct)
+ .resultAsserter((actual, expected, isOrdered, isParallel) -> {
+ List<Integer> l = new ArrayList<>();
+ actual.forEach(l::add);
+
+ // Assert stability
+ // The single result element should be equal in identity to
+ // the first input element
+ assertEquals(l.size(), 1);
+ assertEquals(System.identityHashCode(l.get(0)),
+ System.identityHashCode(expectedElement));
+
+ })
+ .exercise();
+ }
}
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -181,7 +181,7 @@
// slice implementations
withData(refLongs()).
stream(s -> fs.apply(s)).
- without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -192,7 +192,7 @@
// slice implementations
withData(ints()).
stream(s -> fs.apply(s)).
- without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -203,7 +203,7 @@
// slice implementations
withData(longs()).
stream(s -> fs.apply(s)).
- without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
@@ -214,7 +214,7 @@
// slice implementations
withData(doubles()).
stream(s -> fs.apply(s)).
- without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+ without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}