--- a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Mon Jun 22 13:30:21 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
*/
package java.util.stream;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -173,8 +176,8 @@
}
},
- // Wrap as parallel + collect
- PAR_STREAM_COLLECT(true) {
+ // Wrap as parallel + collect to list
+ PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
@@ -182,8 +185,8 @@
}
},
- // Wrap sequential as parallel, + collect
- STREAM_TO_PAR_STREAM_COLLECT(true) {
+ // Wrap sequential as parallel, + collect to list
+ STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
@@ -192,19 +195,56 @@
},
// Wrap parallel as sequential,, + collect
- PAR_STREAM_TO_STREAM_COLLECT(true) {
+ PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
b.accept(u);
}
},
+
+ // Wrap as parallel stream + forEach synchronizing
+ PAR_STREAM_FOR_EACH(true, false) {
+ <T, U, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+ m.apply(data.parallelStream()).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
+
+ // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+ PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+ <T, U, S_IN extends BaseStream<T, S_IN>>
+ void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+ S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+ new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+ m.apply(pipe1).forEach(e -> {
+ synchronized (data) {
+ b.accept(e);
+ }
+ });
+ }
+ },
;
- private boolean isParallel;
+ // The set of scenarios that clean the SIZED flag
+ public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+ EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
+
+ private final boolean isParallel;
+
+ private final boolean isOrdered;
StreamTestScenario(boolean isParallel) {
+ this(isParallel, true);
+ }
+
+ StreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
+ this.isOrdered = isOrdered;
}
public StreamShape getShape() {
@@ -215,6 +255,10 @@
return isParallel;
}
+ public boolean isOrdered() {
+ return isOrdered;
+ }
+
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, b, (Function<S_IN, Stream<U>>) m);