8067969: Optimize Stream.count for SIZED Streams
Reviewed-by: psandoz, chegar
Contributed-by: Aggelos Biboudis <biboudis@gmail.com>
--- a/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -447,7 +447,7 @@
@Override
public final long count() {
- return mapToLong(e -> 1L).sum();
+ return evaluate(ReduceOps.makeDoubleCounting());
}
@Override
--- a/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java Mon Mar 16 10:19:49 2015 +0100
@@ -581,6 +581,24 @@
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal operation</a>.
*
+ * @apiNote
+ * An implementation may choose to not execute the stream pipeline (either
+ * sequentially or in parallel) if it is capable of computing the count
+ * directly from the stream source. In such cases no source elements will
+ * be traversed and no intermediate operations will be evaluated.
+ * Behavioral parameters with side-effects, which are strongly discouraged
+ * except for harmless cases such as debugging, may be affected. For
+ * example, consider the following stream:
+ * <pre>{@code
+ * DoubleStream s = DoubleStream.of(1, 2, 3, 4);
+ * long count = s.peek(System.out::println).count();
+ * }</pre>
+ * The number of elements covered by the stream source is known and the
+ * intermediate operation, {@code peek}, does not inject into or remove
+ * elements from the stream (as may be the case for {@code flatMap} or
+ * {@code filter} operations). Thus the count is 4 and there is no need to
+ * execute the pipeline and, as a side-effect, print out the elements.
+ *
* @return the count of elements in this stream
*/
long count();
--- a/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -427,7 +427,7 @@
@Override
public final long count() {
- return mapToLong(e -> 1L).sum();
+ return evaluate(ReduceOps.makeIntCounting());
}
@Override
--- a/jdk/src/java.base/share/classes/java/util/stream/IntStream.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/IntStream.java Mon Mar 16 10:19:49 2015 +0100
@@ -504,6 +504,24 @@
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal operation</a>.
*
+ * @apiNote
+ * An implementation may choose to not execute the stream pipeline (either
+ * sequentially or in parallel) if it is capable of computing the count
+ * directly from the stream source. In such cases no source elements will
+ * be traversed and no intermediate operations will be evaluated.
+ * Behavioral parameters with side-effects, which are strongly discouraged
+ * except for harmless cases such as debugging, may be affected. For
+ * example, consider the following stream:
+ * <pre>{@code
+ * IntStream s = IntStream.of(1, 2, 3, 4);
+ * long count = s.peek(System.out::println).count();
+ * }</pre>
+ * The number of elements covered by the stream source is known and the
+ * intermediate operation, {@code peek}, does not inject into or remove
+ * elements from the stream (as may be the case for {@code flatMap} or
+ * {@code filter} operations). Thus the count is 4 and there is no need to
+ * execute the pipeline and, as a side-effect, print out the elements.
+ *
* @return the count of elements in this stream
*/
long count();
--- a/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -425,7 +425,7 @@
@Override
public final long count() {
- return map(e -> 1L).sum();
+ return evaluate(ReduceOps.makeLongCounting());
}
@Override
--- a/jdk/src/java.base/share/classes/java/util/stream/LongStream.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/LongStream.java Mon Mar 16 10:19:49 2015 +0100
@@ -509,6 +509,24 @@
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal operation</a>.
*
+ * @apiNote
+ * An implementation may choose to not execute the stream pipeline (either
+ * sequentially or in parallel) if it is capable of computing the count
+ * directly from the stream source. In such cases no source elements will
+ * be traversed and no intermediate operations will be evaluated.
+ * Behavioral parameters with side-effects, which are strongly discouraged
+ * except for harmless cases such as debugging, may be affected. For
+ * example, consider the following stream:
+ * <pre>{@code
+ * LongStream s = LongStream.of(1, 2, 3, 4);
+ * long count = s.peek(System.out::println).count();
+ * }</pre>
+ * The number of elements covered by the stream source is known and the
+ * intermediate operation, {@code peek}, does not inject into or remove
+ * elements from the stream (as may be the case for {@code flatMap} or
+ * {@code filter} operations). Thus the count is 4 and there is no need to
+ * execute the pipeline and, as a side-effect, print out the elements.
+ *
* @return the count of elements in this stream
*/
long count();
--- a/jdk/src/java.base/share/classes/java/util/stream/ReduceOps.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/ReduceOps.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -234,6 +234,40 @@
}
/**
+ * Constructs a {@code TerminalOp} that counts the number of stream
+ * elements. If the size of the pipeline is known then count is the size
+ * and there is no need to evaluate the pipeline. If the size of the
+ * pipeline is non known then count is produced, via reduction, using a
+ * {@link CountingSink}.
+ *
+ * @param <T> the type of the input elements
+ * @return a {@code TerminalOp} implementing the counting
+ */
+ public static <T> TerminalOp<T, Long>
+ makeRefCounting() {
+ return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
+ @Override
+ public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
+
+ @Override
+ public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateSequential(helper, spliterator);
+ }
+
+ @Override
+ public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateParallel(helper, spliterator);
+ }
+ };
+ }
+
+ /**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code int} values.
*
@@ -370,6 +404,39 @@
}
/**
+ * Constructs a {@code TerminalOp} that counts the number of stream
+ * elements. If the size of the pipeline is known then count is the size
+ * and there is no need to evaluate the pipeline. If the size of the
+ * pipeline is non known then count is produced, via reduction, using a
+ * {@link CountingSink}.
+ *
+ * @return a {@code TerminalOp} implementing the counting
+ */
+ public static TerminalOp<Integer, Long>
+ makeIntCounting() {
+ return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.REFERENCE) {
+ @Override
+ public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
+
+ @Override
+ public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateSequential(helper, spliterator);
+ }
+
+ @Override
+ public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateParallel(helper, spliterator);
+ }
+ };
+ }
+
+ /**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code long} values.
*
@@ -506,6 +573,39 @@
}
/**
+ * Constructs a {@code TerminalOp} that counts the number of stream
+ * elements. If the size of the pipeline is known then count is the size
+ * and there is no need to evaluate the pipeline. If the size of the
+ * pipeline is non known then count is produced, via reduction, using a
+ * {@link CountingSink}.
+ *
+ * @return a {@code TerminalOp} implementing the counting
+ */
+ public static TerminalOp<Long, Long>
+ makeLongCounting() {
+ return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.REFERENCE) {
+ @Override
+ public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
+
+ @Override
+ public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateSequential(helper, spliterator);
+ }
+
+ @Override
+ public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateParallel(helper, spliterator);
+ }
+ };
+ }
+
+ /**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code double} values.
*
@@ -642,6 +742,91 @@
}
/**
+ * Constructs a {@code TerminalOp} that counts the number of stream
+ * elements. If the size of the pipeline is known then count is the size
+ * and there is no need to evaluate the pipeline. If the size of the
+ * pipeline is non known then count is produced, via reduction, using a
+ * {@link CountingSink}.
+ *
+ * @return a {@code TerminalOp} implementing the counting
+ */
+ public static TerminalOp<Double, Long>
+ makeDoubleCounting() {
+ return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.REFERENCE) {
+ @Override
+ public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
+
+ @Override
+ public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateSequential(helper, spliterator);
+ }
+
+ @Override
+ public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
+ return spliterator.getExactSizeIfKnown();
+ return super.evaluateParallel(helper, spliterator);
+ }
+ };
+ }
+
+ /**
+ * A sink that counts elements
+ */
+ static abstract class CountingSink<T>
+ extends Box<Long>
+ implements AccumulatingSink<T, Long, CountingSink<T>> {
+ long count;
+
+ @Override
+ public void begin(long size) {
+ count = 0L;
+ }
+
+ @Override
+ public Long get() {
+ return count;
+ }
+
+ @Override
+ public void combine(CountingSink<T> other) {
+ count += other.count;
+ }
+
+ static final class OfRef<T> extends CountingSink<T> {
+ @Override
+ public void accept(T t) {
+ count++;
+ }
+ }
+
+ static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
+ @Override
+ public void accept(int t) {
+ count++;
+ }
+ }
+
+ static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
+ @Override
+ public void accept(long t) {
+ count++;
+ }
+ }
+
+ static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
+ @Override
+ public void accept(double t) {
+ count++;
+ }
+ }
+ }
+
+ /**
* A type of {@code TerminalSink} that implements an associative reducing
* operation on elements of type {@code T} and producing a result of type
* {@code R}.
@@ -652,7 +837,7 @@
*/
private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
extends TerminalSink<T, R> {
- public void combine(K other);
+ void combine(K other);
}
/**
--- a/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -523,10 +523,9 @@
@Override
public final long count() {
- return mapToLong(e -> 1L).sum();
+ return evaluate(ReduceOps.makeRefCounting());
}
-
//
/**
--- a/jdk/src/java.base/share/classes/java/util/stream/Stream.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/stream/Stream.java Mon Mar 16 10:19:49 2015 +0100
@@ -851,6 +851,25 @@
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal operation</a>.
*
+ * @apiNote
+ * An implementation may choose to not execute the stream pipeline (either
+ * sequentially or in parallel) if it is capable of computing the count
+ * directly from the stream source. In such cases no source elements will
+ * be traversed and no intermediate operations will be evaluated.
+ * Behavioral parameters with side-effects, which are strongly discouraged
+ * except for harmless cases such as debugging, may be affected. For
+ * example, consider the following stream:
+ * <pre>{@code
+ * List<String> l = Arrays.asList("A", "B", "C", "D");
+ * long count = l.stream().peek(System.out::println).count();
+ * }</pre>
+ * The number of elements covered by the stream source, a {@code List}, is
+ * known and the intermediate operation, {@code peek}, does not inject into
+ * or remove elements from the stream (as may be the case for
+ * {@code flatMap} or {@code filter} operations). Thus the count is the
+ * size of the {@code List} and there is no need to execute the pipeline
+ * and, as a side-effect, print out the list elements.
+ *
* @return the count of elements in this stream
*/
long count();
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CountLargeTest.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CountLargeTest.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,7 +24,7 @@
/**
* @test
* @summary Tests counting of streams containing Integer.MAX_VALUE + 1 elements
- * @bug 8031187
+ * @bug 8031187 8067969
*/
package org.openjdk.tests.java.util.stream;
@@ -41,30 +41,62 @@
static final long EXPECTED_LARGE_COUNT = 1L + Integer.MAX_VALUE;
public void testRefLarge() {
- long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
- .mapToObj(e -> null).count();
-
- assertEquals(count, EXPECTED_LARGE_COUNT);
+ // Test known sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToObj(e -> null).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
+ // Test unknown sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToObj(e -> null).filter(e -> true).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
}
public void testIntLarge() {
- long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
- .mapToInt(e -> 0).count();
-
- assertEquals(count, EXPECTED_LARGE_COUNT);
+ // Test known sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToInt(e -> 0).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
+ // Test unknown sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToInt(e -> 0).filter(e -> true).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
}
public void testLongLarge() {
- long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
- .count();
-
- assertEquals(count, EXPECTED_LARGE_COUNT);
+ // Test known sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
+ // Test unknown sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .filter(e -> true).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
}
public void testDoubleLarge() {
- long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
- .mapToDouble(e -> 0.0).count();
-
- assertEquals(count, EXPECTED_LARGE_COUNT);
+ // Test known sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToDouble(e -> 0.0).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
+ // Test unknown sized stream
+ {
+ long count = LongStream.range(0, EXPECTED_LARGE_COUNT)
+ .mapToDouble(e -> 0.0).filter(e -> true).count();
+ assertEquals(count, EXPECTED_LARGE_COUNT);
+ }
}
}
--- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CountTest.java Sat Mar 14 09:38:52 2015 -0700
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/CountTest.java Mon Mar 16 10:19:49 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,11 +24,12 @@
/**
* @test
* @summary Tests counting of streams
- * @bug 8031187
+ * @bug 8031187 8067969
*/
package org.openjdk.tests.java.util.stream;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.DoubleStream;
import java.util.stream.DoubleStreamTestDataProvider;
import java.util.stream.IntStream;
@@ -52,6 +53,12 @@
terminal(Stream::count).
expectedResult(expectedCount).
exercise();
+
+ // Test with an unknown sized stream
+ withData(data).
+ terminal(s -> s.filter(e -> true), Stream::count).
+ expectedResult(expectedCount).
+ exercise();
}
@Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
@@ -62,6 +69,11 @@
terminal(IntStream::count).
expectedResult(expectedCount).
exercise();
+
+ withData(data).
+ terminal(s -> s.filter(e -> true), IntStream::count).
+ expectedResult(expectedCount).
+ exercise();
}
@Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
@@ -72,6 +84,11 @@
terminal(LongStream::count).
expectedResult(expectedCount).
exercise();
+
+ withData(data).
+ terminal(s -> s.filter(e -> true), LongStream::count).
+ expectedResult(expectedCount).
+ exercise();
}
@Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
@@ -82,5 +99,36 @@
terminal(DoubleStream::count).
expectedResult(expectedCount).
exercise();
+
+ withData(data).
+ terminal(s -> s.filter(e -> true), DoubleStream::count).
+ expectedResult(expectedCount).
+ exercise();
+ }
+
+ public void testNoEvaluationForSizedStream() {
+ {
+ AtomicInteger ai = new AtomicInteger();
+ Stream.of(1, 2, 3, 4).peek(e -> ai.getAndIncrement()).count();
+ assertEquals(ai.get(), 0);
+ }
+
+ {
+ AtomicInteger ai = new AtomicInteger();
+ IntStream.of(1, 2, 3, 4).peek(e -> ai.getAndIncrement()).count();
+ assertEquals(ai.get(), 0);
+ }
+
+ {
+ AtomicInteger ai = new AtomicInteger();
+ LongStream.of(1, 2, 3, 4).peek(e -> ai.getAndIncrement()).count();
+ assertEquals(ai.get(), 0);
+ }
+
+ {
+ AtomicInteger ai = new AtomicInteger();
+ DoubleStream.of(1, 2, 3, 4).peek(e -> ai.getAndIncrement()).count();
+ assertEquals(ai.get(), 0);
+ }
}
}