8148250: Stream.limit() parallel tasks with ordered non-SUBSIZED source should short-circuit
authortvaleev
Mon, 08 Feb 2016 10:37:37 +0100
changeset 35712 db033bd15d41
parent 35711 d08f24178498
child 35713 f61cb8475e5a
8148250: Stream.limit() parallel tasks with ordered non-SUBSIZED source should short-circuit Reviewed-by: psandoz
jdk/src/java.base/share/classes/java/util/stream/SliceOps.java
jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java
--- a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java	Fri Feb 05 15:23:15 2016 +0100
+++ b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java	Mon Feb 08 10:37:37 2016 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -139,7 +139,7 @@
                 }
                 else {
                     // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
-                    //     regardless of the value of n
+                    //     when n * parallelismLevel is sufficiently large.
                     //     Need to adjust the target size of splitting for the
                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
                     //     This will limit the size of the buffers created at the leaf nodes
@@ -604,8 +604,15 @@
                 return nb.build();
             }
             else {
-                Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
-                                                          spliterator).build();
+                final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator);
+                if (targetOffset == 0) { // limit only
+                    Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
+                    helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
+                }
+                else {
+                    helper.wrapAndCopyInto(nb, spliterator);
+                }
+                Node<P_OUT> node = nb.build();
                 thisNodeSize = node.count();
                 completed = true;
                 spliterator = null;
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java	Fri Feb 05 15:23:15 2016 +0100
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java	Mon Feb 08 10:37:37 2016 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -20,6 +20,12 @@
  * or visit www.oracle.com if you need additional information or have any
  * questions.
  */
+
+/*
+ * @test
+ * @bug 8148250
+ */
+
 package org.openjdk.tests.java.util.stream;
 
 import org.testng.annotations.Test;
@@ -341,4 +347,13 @@
             return Arrays.asList(0, 1, size / 2, size - 1, size, size + 1, 2 * size);
         }
     }
+
+    public void testLimitParallelHugeInput() {
+        for (int n : new int[] {10, 100, 1000, 10000}) {
+            long[] actual = LongStream.range(0, Long.MAX_VALUE)
+                                  .parallel().filter(x -> true) // remove SIZED
+                                  .limit(n).toArray();
+            assertEquals(LongStream.range(0, n).toArray(), actual);
+        }
+    }
 }