8148250: Stream.limit() parallel tasks with ordered non-SUBSIZED source should short-circuit
Reviewed-by: psandoz
--- a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java Fri Feb 05 15:23:15 2016 +0100
+++ b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java Mon Feb 08 10:37:37 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -139,7 +139,7 @@
}
else {
// @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
- // regardless of the value of n
+ // when n * parallelismLevel is sufficiently large.
// Need to adjust the target size of splitting for the
// SliceTask from say (size / k) to say min(size / k, 1 << 14)
// This will limit the size of the buffers created at the leaf nodes
@@ -604,8 +604,15 @@
return nb.build();
}
else {
- Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
- spliterator).build();
+ final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator);
+ if (targetOffset == 0) { // limit only
+ Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
+ helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
+ }
+ else {
+ helper.wrapAndCopyInto(nb, spliterator);
+ }
+ Node<P_OUT> node = nb.build();
thisNodeSize = node.count();
completed = true;
spliterator = null;
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java Fri Feb 05 15:23:15 2016 +0100
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java Mon Feb 08 10:37:37 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -20,6 +20,12 @@
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
+
+/*
+ * @test
+ * @bug 8148250
+ */
+
package org.openjdk.tests.java.util.stream;
import org.testng.annotations.Test;
@@ -341,4 +347,13 @@
return Arrays.asList(0, 1, size / 2, size - 1, size, size + 1, 2 * size);
}
}
+
+ public void testLimitParallelHugeInput() {
+ for (int n : new int[] {10, 100, 1000, 10000}) {
+ long[] actual = LongStream.range(0, Long.MAX_VALUE)
+ .parallel().filter(x -> true) // remove SIZED
+ .limit(n).toArray();
+ assertEquals(LongStream.range(0, n).toArray(), actual);
+ }
+ }
}