8071597: Add Stream dropWhile and takeWhile operations
Reviewed-by: briangoetz, smarks, chegar, forax
--- a/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java Tue Jun 09 07:10:03 2015 +0100
@@ -489,15 +489,17 @@
@Override
@SuppressWarnings("unchecked")
- final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
+ final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
+
wrappedSink.begin(spliterator.getExactSizeIfKnown());
- p.forEachWithCancel(spliterator, wrappedSink);
+ boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
+ return cancelled;
}
@Override
@@ -602,8 +604,9 @@
*
* @param spliterator the spliterator to pull elements from
* @param sink the sink to push elements to
+ * @return true if the cancellation was requested
*/
- abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
+ abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
/**
* Make a node builder compatible with this stream shape.
--- a/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java Tue Jun 09 07:10:03 2015 +0100
@@ -40,6 +40,7 @@
import java.util.function.DoubleToLongFunction;
import java.util.function.DoubleUnaryOperator;
import java.util.function.IntFunction;
+import java.util.function.LongPredicate;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;
@@ -153,10 +154,12 @@
}
@Override
- final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
+ final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
Spliterator.OfDouble spl = adapt(spliterator);
DoubleConsumer adaptedSink = adapt(sink);
- do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
+ boolean cancelled;
+ do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
+ return cancelled;
}
@Override
@@ -353,6 +356,16 @@
}
@Override
+ public final DoubleStream takeWhile(DoublePredicate predicate) {
+ return WhileOps.makeTakeWhileDouble(this, predicate);
+ }
+
+ @Override
+ public final DoubleStream dropWhile(DoublePredicate predicate) {
+ return WhileOps.makeDropWhileDouble(this, predicate);
+ }
+
+ @Override
public final DoubleStream sorted() {
return SortedOps.makeDouble(this);
}
--- a/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,18 +24,13 @@
*/
package java.util.stream;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Arrays;
-import java.util.Collection;
import java.util.DoubleSummaryStatistics;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.DoubleBinaryOperator;
import java.util.function.DoubleConsumer;
@@ -280,6 +275,137 @@
DoubleStream skip(long n);
/**
+ * Returns, if this stream is ordered, a stream consisting of the longest
+ * prefix of elements taken from this stream that match the given predicate.
+ * Otherwise returns, if this stream is unordered, a stream consisting of a
+ * subset of elements taken from this stream that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to take any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * takes all elements (the result is the same is the input), or if no
+ * elements of the stream match the given predicate then no elements are
+ * taken (the result is an empty stream).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
+ * stateful intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code takeWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as
+ * {@link #generate(DoubleSupplier)}) or removing the ordering constraint
+ * with {@link #unordered()} may result in significant speedups of
+ * {@code takeWhile()} in parallel pipelines, if the semantics of your
+ * situation permit. If consistency with encounter order is required, and
+ * you are experiencing poor performance or memory utilization with
+ * {@code takeWhile()} in parallel pipelines, switching to sequential
+ * execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default DoubleStream takeWhile(DoublePredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.doubleStream(
+ new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
+ * Returns, if this stream is ordered, a stream consisting of the remaining
+ * elements of this stream after dropping the longest prefix of elements
+ * that match the given predicate. Otherwise returns, if this stream is
+ * unordered, a stream consisting of the remaining elements of this stream
+ * after dropping a subset of elements that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to drop any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * drops all elements (the result is an empty stream), or if no elements of
+ * the stream match the given predicate then no elements are dropped (the
+ * result is the same is the input).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">stateful
+ * intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code dropWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as
+ * {@link #generate(DoubleSupplier)}) or removing the ordering constraint
+ * with {@link #unordered()} may result in significant speedups of
+ * {@code dropWhile()} in parallel pipelines, if the semantics of your
+ * situation permit. If consistency with encounter order is required, and
+ * you are experiencing poor performance or memory utilization with
+ * {@code dropWhile()} in parallel pipelines, switching to sequential
+ * execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default DoubleStream dropWhile(DoublePredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.doubleStream(
+ new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
* Performs an action for each element of this stream.
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal
--- a/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java Tue Jun 09 07:10:03 2015 +0100
@@ -156,10 +156,12 @@
}
@Override
- final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
+ final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
Spliterator.OfInt spl = adapt(spliterator);
IntConsumer adaptedSink = adapt(sink);
- do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
+ boolean cancelled;
+ do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
+ return cancelled;
}
@Override
@@ -387,6 +389,16 @@
}
@Override
+ public final IntStream takeWhile(IntPredicate predicate) {
+ return WhileOps.makeTakeWhileInt(this, predicate);
+ }
+
+ @Override
+ public final IntStream dropWhile(IntPredicate predicate) {
+ return WhileOps.makeDropWhileInt(this, predicate);
+ }
+
+ @Override
public final IntStream sorted() {
return SortedOps.makeInt(this);
}
--- a/jdk/src/java.base/share/classes/java/util/stream/IntStream.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/IntStream.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -273,6 +273,135 @@
IntStream skip(long n);
/**
+ * Returns, if this stream is ordered, a stream consisting of the longest
+ * prefix of elements taken from this stream that match the given predicate.
+ * Otherwise returns, if this stream is unordered, a stream consisting of a
+ * subset of elements taken from this stream that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to take any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * takes all elements (the result is the same is the input), or if no
+ * elements of the stream match the given predicate then no elements are
+ * taken (the result is an empty stream).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
+ * stateful intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code takeWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as {@link #generate(IntSupplier)})
+ * or removing the ordering constraint with {@link #unordered()} may result
+ * in significant speedups of {@code takeWhile()} in parallel pipelines, if
+ * the semantics of your situation permit. If consistency with encounter
+ * order is required, and you are experiencing poor performance or memory
+ * utilization with {@code takeWhile()} in parallel pipelines, switching to
+ * sequential execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default IntStream takeWhile(IntPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.intStream(
+ new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
+ * Returns, if this stream is ordered, a stream consisting of the remaining
+ * elements of this stream after dropping the longest prefix of elements
+ * that match the given predicate. Otherwise returns, if this stream is
+ * unordered, a stream consisting of the remaining elements of this stream
+ * after dropping a subset of elements that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to drop any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * drops all elements (the result is an empty stream), or if no elements of
+ * the stream match the given predicate then no elements are dropped (the
+ * result is the same is the input).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">stateful
+ * intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code dropWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as {@link #generate(IntSupplier)})
+ * or removing the ordering constraint with {@link #unordered()} may result
+ * in significant speedups of {@code dropWhile()} in parallel pipelines, if
+ * the semantics of your situation permit. If consistency with encounter
+ * order is required, and you are experiencing poor performance or memory
+ * utilization with {@code dropWhile()} in parallel pipelines, switching to
+ * sequential execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default IntStream dropWhile(IntPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.intStream(
+ new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
* Performs an action for each element of this stream.
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal
--- a/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java Tue Jun 09 07:10:03 2015 +0100
@@ -154,10 +154,12 @@
}
@Override
- final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
+ final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
Spliterator.OfLong spl = adapt(spliterator);
LongConsumer adaptedSink = adapt(sink);
- do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
+ boolean cancelled;
+ do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
+ return cancelled;
}
@Override
@@ -368,6 +370,16 @@
}
@Override
+ public final LongStream takeWhile(LongPredicate predicate) {
+ return WhileOps.makeTakeWhileLong(this, predicate);
+ }
+
+ @Override
+ public final LongStream dropWhile(LongPredicate predicate) {
+ return WhileOps.makeDropWhileLong(this, predicate);
+ }
+
+ @Override
public final LongStream sorted() {
return SortedOps.makeLong(this);
}
--- a/jdk/src/java.base/share/classes/java/util/stream/LongStream.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/LongStream.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,11 +24,7 @@
*/
package java.util.stream;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Arrays;
-import java.util.Collection;
import java.util.LongSummaryStatistics;
import java.util.Objects;
import java.util.OptionalDouble;
@@ -36,7 +32,6 @@
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongBinaryOperator;
@@ -278,6 +273,137 @@
LongStream skip(long n);
/**
+ * Returns, if this stream is ordered, a stream consisting of the longest
+ * prefix of elements taken from this stream that match the given predicate.
+ * Otherwise returns, if this stream is unordered, a stream consisting of a
+ * subset of elements taken from this stream that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to take any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * takes all elements (the result is the same is the input), or if no
+ * elements of the stream match the given predicate then no elements are
+ * taken (the result is an empty stream).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
+ * stateful intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code takeWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as
+ * {@link #generate(LongSupplier)}) or removing the ordering constraint with
+ * {@link #unordered()} may result in significant speedups of
+ * {@code takeWhile()} in parallel pipelines, if the semantics of your
+ * situation permit. If consistency with encounter order is required, and
+ * you are experiencing poor performance or memory utilization with
+ * {@code takeWhile()} in parallel pipelines, switching to sequential
+ * execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default LongStream takeWhile(LongPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.longStream(
+ new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
+ * Returns, if this stream is ordered, a stream consisting of the remaining
+ * elements of this stream after dropping the longest prefix of elements
+ * that match the given predicate. Otherwise returns, if this stream is
+ * unordered, a stream consisting of the remaining elements of this stream
+ * after dropping a subset of elements that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to drop any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * drops all elements (the result is an empty stream), or if no elements of
+ * the stream match the given predicate then no elements are dropped (the
+ * result is the same is the input).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">stateful
+ * intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code dropWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as
+ * {@link #generate(LongSupplier)}) or removing the ordering constraint with
+ * {@link #unordered()} may result in significant speedups of
+ * {@code dropWhile()} in parallel pipelines, if the semantics of your
+ * situation permit. If consistency with encounter order is required, and
+ * you are experiencing poor performance or memory utilization with
+ * {@code dropWhile()} in parallel pipelines, switching to sequential
+ * execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default LongStream dropWhile(LongPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.longStream(
+ new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
* Performs an action for each element of this stream.
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal
--- a/jdk/src/java.base/share/classes/java/util/stream/Node.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/Node.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -125,7 +125,11 @@
Node.Builder<T> nodeBuilder = Nodes.builder(size, generator);
nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { }
- for (int i = 0; (i < size) && spliterator.tryAdvance(nodeBuilder); i++) { }
+ if (to == count()) {
+ spliterator.forEachRemaining(nodeBuilder);
+ } else {
+ for (int i = 0; i < size && spliterator.tryAdvance(nodeBuilder); i++) { }
+ }
nodeBuilder.end();
return nodeBuilder.build();
}
@@ -360,7 +364,11 @@
Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size);
nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { }
- for (int i = 0; (i < size) && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { }
+ if (to == count()) {
+ spliterator.forEachRemaining((IntConsumer) nodeBuilder);
+ } else {
+ for (int i = 0; i < size && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { }
+ }
nodeBuilder.end();
return nodeBuilder.build();
}
@@ -433,7 +441,11 @@
Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size);
nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { }
- for (int i = 0; (i < size) && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { }
+ if (to == count()) {
+ spliterator.forEachRemaining((LongConsumer) nodeBuilder);
+ } else {
+ for (int i = 0; i < size && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { }
+ }
nodeBuilder.end();
return nodeBuilder.build();
}
@@ -508,7 +520,11 @@
Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size);
nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { }
- for (int i = 0; (i < size) && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { }
+ if (to == count()) {
+ spliterator.forEachRemaining((DoubleConsumer) nodeBuilder);
+ } else {
+ for (int i = 0; i < size && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { }
+ }
nodeBuilder.end();
return nodeBuilder.build();
}
--- a/jdk/src/java.base/share/classes/java/util/stream/Nodes.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/Nodes.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -69,6 +69,14 @@
private static final Node.OfLong EMPTY_LONG_NODE = new EmptyNode.OfLong();
private static final Node.OfDouble EMPTY_DOUBLE_NODE = new EmptyNode.OfDouble();
+ /**
+ * @return an array generator for an array whose elements are of type T.
+ */
+ @SuppressWarnings("unchecked")
+ static <T> IntFunction<T[]> castingArray() {
+ return size -> (T[]) new Object[size];
+ }
+
// General shape-based node creation methods
/**
--- a/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -136,8 +136,9 @@
*
* @param wrappedSink the destination {@code Sink}
* @param spliterator the source {@code Spliterator}
+ * @return true if the cancellation was requested
*/
- abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
+ abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
/**
* Takes a {@code Sink} that accepts elements of the output type of the
--- a/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java Tue Jun 09 07:10:03 2015 +0100
@@ -122,8 +122,10 @@
}
@Override
- final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
- do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
+ final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
+ boolean cancelled;
+ do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
+ return cancelled;
}
@Override
@@ -411,6 +413,16 @@
return SliceOps.makeRef(this, n, -1);
}
+ @Override
+ public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
+ return WhileOps.makeTakeWhileRef(this, predicate);
+ }
+
+ @Override
+ public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
+ return WhileOps.makeDropWhileRef(this, predicate);
+ }
+
// Terminal operations from Stream
@Override
--- a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java Tue Jun 09 07:10:03 2015 +0100
@@ -96,11 +96,6 @@
}
}
- @SuppressWarnings("unchecked")
- private static <T> IntFunction<T[]> castingArray() {
- return size -> (T[]) new Object[size];
- }
-
/**
* Appends a "slice" operation to the provided stream. The slice operation
* may be may be skip-only, limit-only, or skip-and-limit.
@@ -151,7 +146,7 @@
// cancellation will be more aggressive cancelling later tasks
// if the target slice size has been reached from a given task,
// cancellation should also clear local results if any
- return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
+ return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit).
invoke().spliterator();
}
}
--- a/jdk/src/java.base/share/classes/java/util/stream/Stream.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/src/java.base/share/classes/java/util/stream/Stream.java Tue Jun 09 07:10:03 2015 +0100
@@ -24,7 +24,6 @@
*/
package java.util.stream;
-import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
@@ -481,6 +480,135 @@
Stream<T> skip(long n);
/**
+ * Returns, if this stream is ordered, a stream consisting of the longest
+ * prefix of elements taken from this stream that match the given predicate.
+ * Otherwise returns, if this stream is unordered, a stream consisting of a
+ * subset of elements taken from this stream that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to take any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * takes all elements (the result is the same is the input), or if no
+ * elements of the stream match the given predicate then no elements are
+ * taken (the result is an empty stream).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
+ * stateful intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code takeWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as {@link #generate(Supplier)}) or
+ * removing the ordering constraint with {@link #unordered()} may result in
+ * significant speedups of {@code takeWhile()} in parallel pipelines, if the
+ * semantics of your situation permit. If consistency with encounter order
+ * is required, and you are experiencing poor performance or memory
+ * utilization with {@code takeWhile()} in parallel pipelines, switching to
+ * sequential execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default Stream<T> takeWhile(Predicate<? super T> predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.stream(
+ new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
+ * Returns, if this stream is ordered, a stream consisting of the remaining
+ * elements of this stream after dropping the longest prefix of elements
+ * that match the given predicate. Otherwise returns, if this stream is
+ * unordered, a stream consisting of the remaining elements of this stream
+ * after dropping a subset of elements that match the given predicate.
+ *
+ * <p>If this stream is ordered then the longest prefix is a contiguous
+ * sequence of elements of this stream that match the given predicate. The
+ * first element of the sequence is the first element of this stream, and
+ * the element immediately following the last element of the sequence does
+ * not match the given predicate.
+ *
+ * <p>If this stream is unordered, and some (but not all) elements of this
+ * stream match the given predicate, then the behavior of this operation is
+ * nondeterministic; it is free to drop any subset of matching elements
+ * (which includes the empty set).
+ *
+ * <p>Independent of whether this stream is ordered or unordered if all
+ * elements of this stream match the given predicate then this operation
+ * drops all elements (the result is an empty stream), or if no elements of
+ * the stream match the given predicate then no elements are dropped (the
+ * result is the same is the input).
+ *
+ * <p>This is a <a href="package-summary.html#StreamOps">stateful
+ * intermediate operation</a>.
+ *
+ * @implSpec
+ * The default implementation obtains the {@link #spliterator() spliterator}
+ * of this stream, wraps that spliterator so as to support the semantics
+ * of this operation on traversal, and returns a new stream associated with
+ * the wrapped spliterator. The returned stream preserves the execution
+ * characteristics of this stream (namely parallel or sequential execution
+ * as per {@link #isParallel()}) but the wrapped spliterator may choose to
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
+ *
+ * @apiNote
+ * While {@code dropWhile()} is generally a cheap operation on sequential
+ * stream pipelines, it can be quite expensive on ordered parallel
+ * pipelines, since the operation is constrained to return not just any
+ * valid prefix, but the longest prefix of elements in the encounter order.
+ * Using an unordered stream source (such as {@link #generate(Supplier)}) or
+ * removing the ordering constraint with {@link #unordered()} may result in
+ * significant speedups of {@code dropWhile()} in parallel pipelines, if the
+ * semantics of your situation permit. If consistency with encounter order
+ * is required, and you are experiencing poor performance or memory
+ * utilization with {@code dropWhile()} in parallel pipelines, switching to
+ * sequential execution with {@link #sequential()} may improve performance.
+ *
+ * @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
+ * <a href="package-summary.html#Statelessness">stateless</a>
+ * predicate to apply to elements to determine the longest
+ * prefix of elements.
+ * @return the new stream
+ */
+ default Stream<T> dropWhile(Predicate<? super T> predicate) {
+ Objects.requireNonNull(predicate);
+ // Reuses the unordered spliterator, which, when encounter is present,
+ // is safe to use as long as it configured not to split
+ return StreamSupport.stream(
+ new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate),
+ isParallel()).onClose(this::close);
+ }
+
+ /**
* Performs an action for each element of this stream.
*
* <p>This is a <a href="package-summary.html#StreamOps">terminal
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.base/share/classes/java/util/stream/WhileOps.java Tue Jun 09 07:10:03 2015 +0100
@@ -0,0 +1,1394 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoublePredicate;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
+import java.util.function.LongConsumer;
+import java.util.function.LongPredicate;
+import java.util.function.Predicate;
+
+/**
+ * Factory for instances of a takeWhile and dropWhile operations
+ * that produce subsequences of their input stream.
+ *
+ * @since 1.9
+ */
+final class WhileOps {
+
+ static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT;
+
+ static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED;
+
+ /**
+ * Appends a "takeWhile" operation to the provided Stream.
+ *
+ * @param <T> the type of both input and output elements
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt taking.
+ */
+ static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream,
+ Predicate<? super T> predicate) {
+ Objects.requireNonNull(predicate);
+ return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) {
+ @Override
+ <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfRef.Taking<>(
+ helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<T[]> generator) {
+ return new TakeWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<T> opWrapSink(int flags, Sink<T> sink) {
+ return new Sink.ChainedReference<T, T>(sink) {
+ boolean take = true;
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(-1);
+ }
+
+ @Override
+ public void accept(T t) {
+ if (take = predicate.test(t)) {
+ downstream.accept(t);
+ }
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return !take || downstream.cancellationRequested();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Appends a "takeWhile" operation to the provided IntStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt taking.
+ */
+ static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream,
+ IntPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) {
+ @Override
+ <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Integer[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfInt.Taking(
+ (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Integer[]> generator) {
+ return new TakeWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+ return new Sink.ChainedInt<Integer>(sink) {
+ boolean take = true;
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(-1);
+ }
+
+ @Override
+ public void accept(int t) {
+ if (take = predicate.test(t)) {
+ downstream.accept(t);
+ }
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return !take || downstream.cancellationRequested();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Appends a "takeWhile" operation to the provided LongStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt taking.
+ */
+ static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream,
+ LongPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) {
+ @Override
+ <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Long[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfLong.Taking(
+ (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Long[]> generator) {
+ return new TakeWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+ return new Sink.ChainedLong<Long>(sink) {
+ boolean take = true;
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(-1);
+ }
+
+ @Override
+ public void accept(long t) {
+ if (take = predicate.test(t)) {
+ downstream.accept(t);
+ }
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return !take || downstream.cancellationRequested();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Appends a "takeWhile" operation to the provided DoubleStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt taking.
+ */
+ static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream,
+ DoublePredicate predicate) {
+ Objects.requireNonNull(predicate);
+ return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) {
+ @Override
+ <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Double[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfDouble.Taking(
+ (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Double[]> generator) {
+ return new TakeWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+ return new Sink.ChainedDouble<Double>(sink) {
+ boolean take = true;
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(-1);
+ }
+
+ @Override
+ public void accept(double t) {
+ if (take = predicate.test(t)) {
+ downstream.accept(t);
+ }
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return !take || downstream.cancellationRequested();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * A specialization for the dropWhile operation that controls if
+ * elements to be dropped are counted and passed downstream.
+ * <p>
+ * This specialization is utilized by the {@link TakeWhileTask} for
+ * pipelines that are ordered. In such cases elements cannot be dropped
+ * until all elements have been collected.
+ *
+ * @param <T> the type of both input and output elements
+ */
+ interface DropWhileOp<T> {
+ /**
+ * Accepts a {@code Sink} which will receive the results of this
+ * dropWhile operation, and return a {@code DropWhileSink} which
+ * accepts
+ * elements and which performs the dropWhile operation passing the
+ * results to the provided {@code Sink}.
+ *
+ * @param sink sink to which elements should be sent after processing
+ * @param retainAndCountDroppedElements true if elements to be dropped
+ * are counted and passed to the sink, otherwise such elements
+ * are actually dropped and not passed to the sink.
+ * @return a dropWhile sink
+ */
+ DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements);
+ }
+
+ /**
+ * A specialization for a dropWhile sink.
+ *
+ * @param <T> the type of both input and output elements
+ */
+ interface DropWhileSink<T> extends Sink<T> {
+ /**
+ * @return the could of elements that would have been dropped and
+ * instead were passed downstream.
+ */
+ long getDropCount();
+ }
+
+ /**
+ * Appends a "dropWhile" operation to the provided Stream.
+ *
+ * @param <T> the type of both input and output elements
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt dropping.
+ */
+ static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream,
+ Predicate<? super T> predicate) {
+ Objects.requireNonNull(predicate);
+
+ class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> {
+ public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) {
+ super(upstream, inputShape, opFlags);
+ }
+
+ @Override
+ <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfRef.Dropping<>(
+ helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<T[]> generator) {
+ return new DropWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<T> opWrapSink(int flags, Sink<T> sink) {
+ return opWrapSink(sink, false);
+ }
+
+ public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) {
+ class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> {
+ long dropCount;
+ boolean take;
+
+ OpSink() {
+ super(sink);
+ }
+
+ @Override
+ public void accept(T t) {
+ boolean takeElement = take || (take = !predicate.test(t));
+
+ // If ordered and element is dropped increment index
+ // for possible future truncation
+ if (retainAndCountDroppedElements && !takeElement)
+ dropCount++;
+
+ // If ordered need to process element, otherwise
+ // skip if element is dropped
+ if (retainAndCountDroppedElements || takeElement)
+ downstream.accept(t);
+ }
+
+ @Override
+ public long getDropCount() {
+ return dropCount;
+ }
+ }
+ return new OpSink();
+ }
+ }
+ return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS);
+ }
+
+ /**
+ * Appends a "dropWhile" operation to the provided IntStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt dropping.
+ */
+ static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream,
+ IntPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> {
+ public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) {
+ super(upstream, inputShape, opFlags);
+ }
+
+ @Override
+ <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Integer[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfInt.Dropping(
+ (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Integer[]> generator) {
+ return new DropWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+ return opWrapSink(sink, false);
+ }
+
+ public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) {
+ class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> {
+ long dropCount;
+ boolean take;
+
+ OpSink() {
+ super(sink);
+ }
+
+ @Override
+ public void accept(int t) {
+ boolean takeElement = take || (take = !predicate.test(t));
+
+ // If ordered and element is dropped increment index
+ // for possible future truncation
+ if (retainAndCountDroppedElements && !takeElement)
+ dropCount++;
+
+ // If ordered need to process element, otherwise
+ // skip if element is dropped
+ if (retainAndCountDroppedElements || takeElement)
+ downstream.accept(t);
+ }
+
+ @Override
+ public long getDropCount() {
+ return dropCount;
+ }
+ }
+ return new OpSink();
+ }
+ }
+ return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS);
+ }
+
+ /**
+ * Appends a "dropWhile" operation to the provided LongStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt dropping.
+ */
+ static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream,
+ LongPredicate predicate) {
+ Objects.requireNonNull(predicate);
+ class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> {
+ public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) {
+ super(upstream, inputShape, opFlags);
+ }
+
+ @Override
+ <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Long[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfLong.Dropping(
+ (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Long[]> generator) {
+ return new DropWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+ return opWrapSink(sink, false);
+ }
+
+ public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) {
+ class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> {
+ long dropCount;
+ boolean take;
+
+ OpSink() {
+ super(sink);
+ }
+
+ @Override
+ public void accept(long t) {
+ boolean takeElement = take || (take = !predicate.test(t));
+
+ // If ordered and element is dropped increment index
+ // for possible future truncation
+ if (retainAndCountDroppedElements && !takeElement)
+ dropCount++;
+
+ // If ordered need to process element, otherwise
+ // skip if element is dropped
+ if (retainAndCountDroppedElements || takeElement)
+ downstream.accept(t);
+ }
+
+ @Override
+ public long getDropCount() {
+ return dropCount;
+ }
+ }
+ return new OpSink();
+ }
+ }
+ return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS);
+ }
+
+ /**
+ * Appends a "dropWhile" operation to the provided DoubleStream.
+ *
+ * @param upstream a reference stream with element type T
+ * @param predicate the predicate that returns false to halt dropping.
+ */
+ static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream,
+ DoublePredicate predicate) {
+ Objects.requireNonNull(predicate);
+ class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> {
+ public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) {
+ super(upstream, inputShape, opFlags);
+ }
+
+ @Override
+ <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator) {
+ if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+ return opEvaluateParallel(helper, spliterator, Double[]::new)
+ .spliterator();
+ }
+ else {
+ return new UnorderedWhileSpliterator.OfDouble.Dropping(
+ (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
+ }
+ }
+
+ @Override
+ <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<Double[]> generator) {
+ return new DropWhileTask<>(this, helper, spliterator, generator)
+ .invoke();
+ }
+
+ @Override
+ Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+ return opWrapSink(sink, false);
+ }
+
+ public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) {
+ class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> {
+ long dropCount;
+ boolean take;
+
+ OpSink() {
+ super(sink);
+ }
+
+ @Override
+ public void accept(double t) {
+ boolean takeElement = take || (take = !predicate.test(t));
+
+ // If ordered and element is dropped increment index
+ // for possible future truncation
+ if (retainAndCountDroppedElements && !takeElement)
+ dropCount++;
+
+ // If ordered need to process element, otherwise
+ // skip if element is dropped
+ if (retainAndCountDroppedElements || takeElement)
+ downstream.accept(t);
+ }
+
+ @Override
+ public long getDropCount() {
+ return dropCount;
+ }
+ }
+ return new OpSink();
+ }
+ }
+ return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS);
+ }
+
+ //
+
+ /**
+ * A spliterator supporting takeWhile and dropWhile operations over an
+ * underlying spliterator whose covered elements have no encounter order.
+ * <p>
+ * Concrete subclasses of this spliterator support reference and primitive
+ * types for takeWhile and dropWhile.
+ * <p>
+ * For the takeWhile operation if during traversal taking completes then
+ * taking is cancelled globally for the splitting and traversal of all
+ * related spliterators.
+ * Cancellation is governed by a shared {@link AtomicBoolean} instance. A
+ * spliterator in the process of taking when cancellation occurs will also
+ * be cancelled but not necessarily immediately. To reduce contention on
+ * the {@link AtomicBoolean} instance, cancellation make be acted on after
+ * a small number of additional elements have been traversed.
+ * <p>
+ * For the dropWhile operation if during traversal dropping completes for
+ * some, but not all elements, then it is cancelled globally for the
+ * traversal of all related spliterators (splitting is not cancelled).
+ * Cancellation is governed in the same manner as for the takeWhile
+ * operation.
+ *
+ * @param <T> the type of elements returned by this spliterator
+ * @param <T_SPLITR> the type of the spliterator
+ */
+ static abstract class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> {
+ // Power of two constant minus one used for modulus of count
+ static final int CANCEL_CHECK_COUNT = (1 << 6) - 1;
+
+ // The underlying spliterator
+ final T_SPLITR s;
+ // True if no splitting should be performed, if true then
+ // this spliterator may be used for an underlying spliterator whose
+ // covered elements have an encounter order
+ // See use in stream take/dropWhile default default methods
+ final boolean noSplitting;
+ // True when operations are cancelled for all related spliterators
+ // For taking, spliterators cannot split or traversed
+ // For dropping, spliterators cannot be traversed
+ final AtomicBoolean cancel;
+ // True while taking or dropping should be performed when traversing
+ boolean takeOrDrop = true;
+ // The count of elements traversed
+ int count;
+
+ UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) {
+ this.s = s;
+ this.noSplitting = noSplitting;
+ this.cancel = new AtomicBoolean();
+ }
+
+ UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) {
+ this.s = s;
+ this.noSplitting = parent.noSplitting;
+ this.cancel = parent.cancel;
+ }
+
+ @Override
+ public long estimateSize() {
+ return s.estimateSize();
+ }
+
+ @Override
+ public int characteristics() {
+ // Size is not known
+ return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
+ }
+
+ @Override
+ public long getExactSizeIfKnown() {
+ return -1L;
+ }
+
+ @Override
+ public Comparator<? super T> getComparator() {
+ return s.getComparator();
+ }
+
+ @Override
+ public T_SPLITR trySplit() {
+ @SuppressWarnings("unchecked")
+ T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit();
+ return ls != null ? makeSpliterator(ls) : null;
+ }
+
+ boolean checkCancelOnCount() {
+ return count != 0 || !cancel.get();
+ }
+
+ abstract T_SPLITR makeSpliterator(T_SPLITR s);
+
+ static abstract class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> {
+ final Predicate<? super T> p;
+ T t;
+
+ OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
+ super(s, noSplitting);
+ this.p = p;
+ }
+
+ OfRef(Spliterator<T> s, OfRef<T> parent) {
+ super(s, parent);
+ this.p = parent.p;
+ }
+
+ @Override
+ public void accept(T t) {
+ count = (count + 1) & CANCEL_CHECK_COUNT;
+ this.t = t;
+ }
+
+ static final class Taking<T> extends OfRef<T> {
+ Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
+ super(s, noSplitting, p);
+ }
+
+ Taking(Spliterator<T> s, Taking<T> parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(Consumer<? super T> action) {
+ boolean test = true;
+ if (takeOrDrop && // If can take
+ checkCancelOnCount() && // and if not cancelled
+ s.tryAdvance(this) && // and if advanced one element
+ (test = p.test(t))) { // and test on element passes
+ action.accept(t); // then accept element
+ return true;
+ }
+ else {
+ // Taking is finished
+ takeOrDrop = false;
+ // Cancel all further traversal and splitting operations
+ // only if test of element failed (short-circuited)
+ if (!test)
+ cancel.set(true);
+ return false;
+ }
+ }
+
+ @Override
+ public Spliterator<T> trySplit() {
+ // Do not split if all operations are cancelled
+ return cancel.get() ? null : super.trySplit();
+ }
+
+ @Override
+ Spliterator<T> makeSpliterator(Spliterator<T> s) {
+ return new Taking<>(s, this);
+ }
+ }
+
+ static final class Dropping<T> extends OfRef<T> {
+ Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
+ super(s, noSplitting, p);
+ }
+
+ Dropping(Spliterator<T> s, Dropping<T> parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(Consumer<? super T> action) {
+ if (takeOrDrop) {
+ takeOrDrop = false;
+ boolean adv;
+ boolean dropped = false;
+ while ((adv = s.tryAdvance(this)) && // If advanced one element
+ checkCancelOnCount() && // and if not cancelled
+ p.test(t)) { // and test on element passes
+ dropped = true; // then drop element
+ }
+
+ // Report advanced element, if any
+ if (adv) {
+ // Cancel all further dropping if one or more elements
+ // were previously dropped
+ if (dropped)
+ cancel.set(true);
+ action.accept(t);
+ }
+ return adv;
+ }
+ else {
+ return s.tryAdvance(action);
+ }
+ }
+
+ @Override
+ Spliterator<T> makeSpliterator(Spliterator<T> s) {
+ return new Dropping<>(s, this);
+ }
+ }
+ }
+
+ static abstract class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt {
+ final IntPredicate p;
+ int t;
+
+ OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
+ super(s, noSplitting);
+ this.p = p;
+ }
+
+ OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
+ super(s, parent);
+ this.p = parent.p;
+ }
+
+ @Override
+ public void accept(int t) {
+ count = (count + 1) & CANCEL_CHECK_COUNT;
+ this.t = t;
+ }
+
+ static final class Taking extends UnorderedWhileSpliterator.OfInt {
+ Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(IntConsumer action) {
+ boolean test = true;
+ if (takeOrDrop && // If can take
+ checkCancelOnCount() && // and if not cancelled
+ s.tryAdvance(this) && // and if advanced one element
+ (test = p.test(t))) { // and test on element passes
+ action.accept(t); // then accept element
+ return true;
+ }
+ else {
+ // Taking is finished
+ takeOrDrop = false;
+ // Cancel all further traversal and splitting operations
+ // only if test of element failed (short-circuited)
+ if (!test)
+ cancel.set(true);
+ return false;
+ }
+ }
+
+ @Override
+ public Spliterator.OfInt trySplit() {
+ // Do not split if all operations are cancelled
+ return cancel.get() ? null : super.trySplit();
+ }
+
+ @Override
+ Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
+ return new Taking(s, this);
+ }
+ }
+
+ static final class Dropping extends UnorderedWhileSpliterator.OfInt {
+ Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(IntConsumer action) {
+ if (takeOrDrop) {
+ takeOrDrop = false;
+ boolean adv;
+ boolean dropped = false;
+ while ((adv = s.tryAdvance(this)) && // If advanced one element
+ checkCancelOnCount() && // and if not cancelled
+ p.test(t)) { // and test on element passes
+ dropped = true; // then drop element
+ }
+
+ // Report advanced element, if any
+ if (adv) {
+ // Cancel all further dropping if one or more elements
+ // were previously dropped
+ if (dropped)
+ cancel.set(true);
+ action.accept(t);
+ }
+ return adv;
+ }
+ else {
+ return s.tryAdvance(action);
+ }
+ }
+
+ @Override
+ Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
+ return new Dropping(s, this);
+ }
+ }
+ }
+
+ static abstract class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong {
+ final LongPredicate p;
+ long t;
+
+ OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
+ super(s, noSplitting);
+ this.p = p;
+ }
+
+ OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
+ super(s, parent);
+ this.p = parent.p;
+ }
+
+ @Override
+ public void accept(long t) {
+ count = (count + 1) & CANCEL_CHECK_COUNT;
+ this.t = t;
+ }
+
+ static final class Taking extends UnorderedWhileSpliterator.OfLong {
+ Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(LongConsumer action) {
+ boolean test = true;
+ if (takeOrDrop && // If can take
+ checkCancelOnCount() && // and if not cancelled
+ s.tryAdvance(this) && // and if advanced one element
+ (test = p.test(t))) { // and test on element passes
+ action.accept(t); // then accept element
+ return true;
+ }
+ else {
+ // Taking is finished
+ takeOrDrop = false;
+ // Cancel all further traversal and splitting operations
+ // only if test of element failed (short-circuited)
+ if (!test)
+ cancel.set(true);
+ return false;
+ }
+ }
+
+ @Override
+ public Spliterator.OfLong trySplit() {
+ // Do not split if all operations are cancelled
+ return cancel.get() ? null : super.trySplit();
+ }
+
+ @Override
+ Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
+ return new Taking(s, this);
+ }
+ }
+
+ static final class Dropping extends UnorderedWhileSpliterator.OfLong {
+ Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(LongConsumer action) {
+ if (takeOrDrop) {
+ takeOrDrop = false;
+ boolean adv;
+ boolean dropped = false;
+ while ((adv = s.tryAdvance(this)) && // If advanced one element
+ checkCancelOnCount() && // and if not cancelled
+ p.test(t)) { // and test on element passes
+ dropped = true; // then drop element
+ }
+
+ // Report advanced element, if any
+ if (adv) {
+ // Cancel all further dropping if one or more elements
+ // were previously dropped
+ if (dropped)
+ cancel.set(true);
+ action.accept(t);
+ }
+ return adv;
+ }
+ else {
+ return s.tryAdvance(action);
+ }
+ }
+
+ @Override
+ Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
+ return new Dropping(s, this);
+ }
+ }
+ }
+
+ static abstract class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble {
+ final DoublePredicate p;
+ double t;
+
+ OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
+ super(s, noSplitting);
+ this.p = p;
+ }
+
+ OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
+ super(s, parent);
+ this.p = parent.p;
+ }
+
+ @Override
+ public void accept(double t) {
+ count = (count + 1) & CANCEL_CHECK_COUNT;
+ this.t = t;
+ }
+
+ static final class Taking extends UnorderedWhileSpliterator.OfDouble {
+ Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(DoubleConsumer action) {
+ boolean test = true;
+ if (takeOrDrop && // If can take
+ checkCancelOnCount() && // and if not cancelled
+ s.tryAdvance(this) && // and if advanced one element
+ (test = p.test(t))) { // and test on element passes
+ action.accept(t); // then accept element
+ return true;
+ }
+ else {
+ // Taking is finished
+ takeOrDrop = false;
+ // Cancel all further traversal and splitting operations
+ // only if test of element failed (short-circuited)
+ if (!test)
+ cancel.set(true);
+ return false;
+ }
+ }
+
+ @Override
+ public Spliterator.OfDouble trySplit() {
+ // Do not split if all operations are cancelled
+ return cancel.get() ? null : super.trySplit();
+ }
+
+ @Override
+ Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
+ return new Taking(s, this);
+ }
+ }
+
+ static final class Dropping extends UnorderedWhileSpliterator.OfDouble {
+ Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
+ super(s, noSplitting, p);
+ }
+
+ Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
+ super(s, parent);
+ }
+
+ @Override
+ public boolean tryAdvance(DoubleConsumer action) {
+ if (takeOrDrop) {
+ takeOrDrop = false;
+ boolean adv;
+ boolean dropped = false;
+ while ((adv = s.tryAdvance(this)) && // If advanced one element
+ checkCancelOnCount() && // and if not cancelled
+ p.test(t)) { // and test on element passes
+ dropped = true; // then drop element
+ }
+
+ // Report advanced element, if any
+ if (adv) {
+ // Cancel all further dropping if one or more elements
+ // were previously dropped
+ if (dropped)
+ cancel.set(true);
+ action.accept(t);
+ }
+ return adv;
+ }
+ else {
+ return s.tryAdvance(action);
+ }
+ }
+
+ @Override
+ Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
+ return new Dropping(s, this);
+ }
+ }
+ }
+ }
+
+
+ //
+
+ /**
+ * {@code ForkJoinTask} implementing takeWhile computation.
+ * <p>
+ * If the pipeline has encounter order then all tasks to the right of
+ * a task where traversal was short-circuited are cancelled.
+ * The results of completed (and cancelled) tasks are discarded.
+ * The result of merging a short-circuited left task and right task (which
+ * may or may not be short-circuited) is that left task.
+ * <p>
+ * If the pipeline has no encounter order then all tasks to the right of
+ * a task where traversal was short-circuited are cancelled.
+ * The results of completed (and possibly cancelled) tasks are not
+ * discarded, as there is no need to throw away computed results.
+ * The result of merging does not change if a left task was
+ * short-circuited.
+ * No attempt is made, once a leaf task stopped taking, for it to cancel
+ * all other tasks, and further more, short-circuit the computation with its
+ * result.
+ *
+ * @param <P_IN> Input element type to the stream pipeline
+ * @param <P_OUT> Output element type from the stream pipeline
+ */
+ @SuppressWarnings("serial")
+ private static final class TakeWhileTask<P_IN, P_OUT>
+ extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> {
+ private final AbstractPipeline<P_OUT, P_OUT, ?> op;
+ private final IntFunction<P_OUT[]> generator;
+ private final boolean isOrdered;
+ private long thisNodeSize;
+ // True if a short-circuited
+ private boolean shortCircuited;
+ // True if completed, must be set after the local result
+ private volatile boolean completed;
+
+ TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
+ PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<P_OUT[]> generator) {
+ super(helper, spliterator);
+ this.op = op;
+ this.generator = generator;
+ this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
+ }
+
+ TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
+ super(parent, spliterator);
+ this.op = parent.op;
+ this.generator = parent.generator;
+ this.isOrdered = parent.isOrdered;
+ }
+
+ @Override
+ protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
+ return new TakeWhileTask<>(this, spliterator);
+ }
+
+ @Override
+ protected final Node<P_OUT> getEmptyResult() {
+ return Nodes.emptyNode(op.getOutputShape());
+ }
+
+ @Override
+ protected final Node<P_OUT> doLeaf() {
+ Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator);
+ Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder);
+
+ if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) {
+ // Cancel later nodes if the predicate returned false
+ // during traversal
+ cancelLaterNodes();
+ }
+
+ Node<P_OUT> node = builder.build();
+ thisNodeSize = node.count();
+ return node;
+ }
+
+ @Override
+ public final void onCompletion(CountedCompleter<?> caller) {
+ if (!isLeaf()) {
+ Node<P_OUT> result;
+ shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited;
+ if (isOrdered && canceled) {
+ thisNodeSize = 0;
+ result = getEmptyResult();
+ }
+ else if (isOrdered && leftChild.shortCircuited) {
+ // If taking finished on the left node then
+ // use the left node result
+ thisNodeSize = leftChild.thisNodeSize;
+ result = leftChild.getLocalResult();
+ }
+ else {
+ thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
+ result = merge();
+ }
+
+ setLocalResult(result);
+ }
+
+ completed = true;
+ super.onCompletion(caller);
+ }
+
+ Node<P_OUT> merge() {
+ if (leftChild.thisNodeSize == 0) {
+ // If the left node size is 0 then
+ // use the right node result
+ return rightChild.getLocalResult();
+ }
+ else if (rightChild.thisNodeSize == 0) {
+ // If the right node size is 0 then
+ // use the left node result
+ return leftChild.getLocalResult();
+ }
+ else {
+ // Combine the left and right nodes
+ return Nodes.conc(op.getOutputShape(),
+ leftChild.getLocalResult(), rightChild.getLocalResult());
+ }
+ }
+
+ @Override
+ protected void cancel() {
+ super.cancel();
+ if (isOrdered && completed)
+ // If the task is completed then clear the result, if any
+ // to aid GC
+ setLocalResult(getEmptyResult());
+ }
+ }
+
+ /**
+ * {@code ForkJoinTask} implementing dropWhile computation.
+ * <p>
+ * If the pipeline has encounter order then each leaf task will not
+ * drop elements but will obtain a count of the elements that would have
+ * been otherwise dropped. That count is used as an index to track
+ * elements to be dropped. Merging will update the index so it corresponds
+ * to the index that is the end of the global prefix of elements to be
+ * dropped. The root is truncated according to that index.
+ * <p>
+ * If the pipeline has no encounter order then each leaf task will drop
+ * elements. Leaf tasks are ordinarily merged. No truncation of the root
+ * node is required.
+ * No attempt is made, once a leaf task stopped dropping, for it to cancel
+ * all other tasks, and further more, short-circuit the computation with
+ * its result.
+ *
+ * @param <P_IN> Input element type to the stream pipeline
+ * @param <P_OUT> Output element type from the stream pipeline
+ */
+ @SuppressWarnings("serial")
+ private static final class DropWhileTask<P_IN, P_OUT>
+ extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> {
+ private final AbstractPipeline<P_OUT, P_OUT, ?> op;
+ private final IntFunction<P_OUT[]> generator;
+ private final boolean isOrdered;
+ private long thisNodeSize;
+ // The index from which elements of the node should be taken
+ // i.e. the node should be truncated from [takeIndex, thisNodeSize)
+ // Equivalent to the count of dropped elements
+ private long index;
+
+ DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
+ PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<P_OUT[]> generator) {
+ super(helper, spliterator);
+ assert op instanceof DropWhileOp;
+ this.op = op;
+ this.generator = generator;
+ this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
+ }
+
+ DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
+ super(parent, spliterator);
+ this.op = parent.op;
+ this.generator = parent.generator;
+ this.isOrdered = parent.isOrdered;
+ }
+
+ @Override
+ protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
+ return new DropWhileTask<>(this, spliterator);
+ }
+
+ @Override
+ protected final Node<P_OUT> doLeaf() {
+ boolean isChild = !isRoot();
+ // If this not the root and pipeline is ordered and size is known
+ // then pre-size the builder
+ long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
+ ? op.exactOutputSizeIfKnown(spliterator)
+ : -1;
+ Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator);
+ @SuppressWarnings("unchecked")
+ DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op;
+ // If this leaf is the root then there is no merging on completion
+ // and there is no need to retain dropped elements
+ DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild);
+ helper.wrapAndCopyInto(s, spliterator);
+
+ Node<P_OUT> node = builder.build();
+ thisNodeSize = node.count();
+ index = s.getDropCount();
+ return node;
+ }
+
+ @Override
+ public final void onCompletion(CountedCompleter<?> caller) {
+ if (!isLeaf()) {
+ if (isOrdered) {
+ index = leftChild.index;
+ // If a contiguous sequence of dropped elements
+ // include those of the right node, if any
+ if (index == leftChild.thisNodeSize)
+ index += rightChild.index;
+ }
+
+ thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
+ Node<P_OUT> result = merge();
+ setLocalResult(isRoot() ? doTruncate(result) : result);
+ }
+
+ super.onCompletion(caller);
+ }
+
+ private Node<P_OUT> merge() {
+ if (leftChild.thisNodeSize == 0) {
+ // If the left node size is 0 then
+ // use the right node result
+ return rightChild.getLocalResult();
+ }
+ else if (rightChild.thisNodeSize == 0) {
+ // If the right node size is 0 then
+ // use the left node result
+ return leftChild.getLocalResult();
+ }
+ else {
+ // Combine the left and right nodes
+ return Nodes.conc(op.getOutputShape(),
+ leftChild.getLocalResult(), rightChild.getLocalResult());
+ }
+ }
+
+ private Node<P_OUT> doTruncate(Node<P_OUT> input) {
+ return isOrdered
+ ? input.truncate(index, input.count(), generator)
+ : input;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/DefaultMethodStreams.java Tue Jun 09 07:10:03 2015 +0100
@@ -0,0 +1,984 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Comparator;
+import java.util.DoubleSummaryStatistics;
+import java.util.IntSummaryStatistics;
+import java.util.Iterator;
+import java.util.LongSummaryStatistics;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.DoubleBinaryOperator;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoubleFunction;
+import java.util.function.DoublePredicate;
+import java.util.function.DoubleToIntFunction;
+import java.util.function.DoubleToLongFunction;
+import java.util.function.DoubleUnaryOperator;
+import java.util.function.Function;
+import java.util.function.IntBinaryOperator;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+import java.util.function.IntPredicate;
+import java.util.function.IntToDoubleFunction;
+import java.util.function.IntToLongFunction;
+import java.util.function.IntUnaryOperator;
+import java.util.function.LongBinaryOperator;
+import java.util.function.LongConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongPredicate;
+import java.util.function.LongToDoubleFunction;
+import java.util.function.LongToIntFunction;
+import java.util.function.LongUnaryOperator;
+import java.util.function.ObjDoubleConsumer;
+import java.util.function.ObjIntConsumer;
+import java.util.function.ObjLongConsumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToDoubleFunction;
+
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+
+import static java.util.stream.Collectors.*;
+
+public final class DefaultMethodStreams {
+
+ static {
+ // Verify that default methods are not overridden
+ verify(DefaultMethodRefStream.class);
+ verify(DefaultMethodIntStream.class);
+ verify(DefaultMethodLongStream.class);
+ verify(DefaultMethodDoubleStream.class);
+ }
+
+ static void verify(Class<?> del) {
+ // Find the stream interface
+ Class<?> s = Stream.of(del.getInterfaces())
+ .filter(c -> BaseStream.class.isAssignableFrom(c))
+ .findFirst().get();
+
+ // Get all default methods on the stream class
+ Set<String> dms = Stream.of(s.getMethods())
+ .filter(m -> !Modifier.isStatic(m.getModifiers()))
+ .filter(m -> !m.isBridge())
+ .filter(Method::isDefault)
+ .map(Method::getName)
+ .collect(toSet());
+
+ // Get all methods on the delegating class
+ Set<String> ims = Stream.of(del.getMethods())
+ .filter(m -> !Modifier.isStatic(m.getModifiers()))
+ .filter(m -> m.getDeclaringClass() == del)
+ .map(Method::getName)
+ .collect(toSet());
+
+ if (ims.stream().anyMatch(dms::contains)) {
+ throw new AssertionError(String.format("%s overrides default methods of %s\n", del, s));
+ }
+ }
+
+ /**
+ * Creates a stream that for the next operation either delegates to
+ * a default method on {@link Stream}, if present for that operation,
+ * otherwise delegates to an underlying stream.
+ *
+ * @param s the underlying stream to be delegated to for non-default
+ * methods.
+ * @param <T> the type of the stream elements
+ * @return the delegating stream
+ */
+ public static <T> Stream<T> delegateTo(Stream<T> s) {
+ return new DefaultMethodRefStream<>(s);
+ }
+
+ /**
+ * Creates a stream that for the next operation either delegates to
+ * a default method on {@link IntStream}, if present for that operation,
+ * otherwise delegates to an underlying stream.
+ *
+ * @param s the underlying stream to be delegated to for non-default
+ * methods.
+ * @return the delegating stream
+ */
+ public static IntStream delegateTo(IntStream s) {
+ return new DefaultMethodIntStream(s);
+ }
+
+ /**
+ * Creates a stream that for the next operation either delegates to
+ * a default method on {@link LongStream}, if present for that operation,
+ * otherwise delegates to an underlying stream.
+ *
+ * @param s the underlying stream to be delegated to for non-default
+ * methods.
+ * @return the delegating stream
+ */
+ public static LongStream delegateTo(LongStream s) {
+ return new DefaultMethodLongStream(s);
+ }
+
+ /**
+ * Creates a stream that for the next operation either delegates to
+ * a default method on {@link DoubleStream}, if present for that operation,
+ * otherwise delegates to an underlying stream.
+ *
+ * @param s the underlying stream to be delegated to for non-default
+ * methods.
+ * @return the delegating stream
+ */
+ public static DoubleStream delegateTo(DoubleStream s) {
+ return new DefaultMethodDoubleStream(s);
+ }
+
+ /**
+ * A stream that delegates the next operation to a default method, if
+ * present, or to the same operation of an underlying stream.
+ *
+ * @param <T> the type of the stream elements
+ */
+ static final class DefaultMethodRefStream<T> implements Stream<T> {
+ final Stream<T> s;
+
+ DefaultMethodRefStream(Stream<T> s) {
+ this.s = s;
+ }
+
+
+ // Delegating non-default methods
+
+ @Override
+ public Stream<T> filter(Predicate<? super T> predicate) {
+ return s.filter(predicate);
+ }
+
+ @Override
+ public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
+ return s.map(mapper);
+ }
+
+ @Override
+ public IntStream mapToInt(ToIntFunction<? super T> mapper) {
+ return s.mapToInt(mapper);
+ }
+
+ @Override
+ public LongStream mapToLong(ToLongFunction<? super T> mapper) {
+ return s.mapToLong(mapper);
+ }
+
+ @Override
+ public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
+ return s.mapToDouble(mapper);
+ }
+
+ @Override
+ public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
+ return s.flatMap(mapper);
+ }
+
+ @Override
+ public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
+ return s.flatMapToInt(mapper);
+ }
+
+ @Override
+ public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
+ return s.flatMapToLong(mapper);
+ }
+
+ @Override
+ public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
+ return s.flatMapToDouble(mapper);
+ }
+
+ @Override
+ public Stream<T> distinct() {
+ return s.distinct();
+ }
+
+ @Override
+ public Stream<T> sorted() {
+ return s.sorted();
+ }
+
+ @Override
+ public Stream<T> sorted(Comparator<? super T> comparator) {
+ return s.sorted(comparator);
+ }
+
+ @Override
+ public Stream<T> peek(Consumer<? super T> action) {
+ return s.peek(action);
+ }
+
+ @Override
+ public Stream<T> limit(long maxSize) {
+ return s.limit(maxSize);
+ }
+
+ @Override
+ public Stream<T> skip(long n) {
+ return s.skip(n);
+ }
+
+ @Override
+ public void forEach(Consumer<? super T> action) {
+ s.forEach(action);
+ }
+
+ @Override
+ public void forEachOrdered(Consumer<? super T> action) {
+ s.forEachOrdered(action);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return s.toArray();
+ }
+
+ @Override
+ public <A> A[] toArray(IntFunction<A[]> generator) {
+ return s.toArray(generator);
+ }
+
+ @Override
+ public T reduce(T identity, BinaryOperator<T> accumulator) {
+ return s.reduce(identity, accumulator);
+ }
+
+ @Override
+ public Optional<T> reduce(BinaryOperator<T> accumulator) {
+ return s.reduce(accumulator);
+ }
+
+ @Override
+ public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+ return s.reduce(identity, accumulator, combiner);
+ }
+
+ @Override
+ public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
+ return s.collect(supplier, accumulator, combiner);
+ }
+
+ @Override
+ public <R, A> R collect(Collector<? super T, A, R> collector) {
+ return s.collect(collector);
+ }
+
+ @Override
+ public Optional<T> min(Comparator<? super T> comparator) {
+ return s.min(comparator);
+ }
+
+ @Override
+ public Optional<T> max(Comparator<? super T> comparator) {
+ return s.max(comparator);
+ }
+
+ @Override
+ public long count() {
+ return s.count();
+ }
+
+ @Override
+ public boolean anyMatch(Predicate<? super T> predicate) {
+ return s.anyMatch(predicate);
+ }
+
+ @Override
+ public boolean allMatch(Predicate<? super T> predicate) {
+ return s.allMatch(predicate);
+ }
+
+ @Override
+ public boolean noneMatch(Predicate<? super T> predicate) {
+ return s.noneMatch(predicate);
+ }
+
+ @Override
+ public Optional<T> findFirst() {
+ return s.findFirst();
+ }
+
+ @Override
+ public Optional<T> findAny() {
+ return s.findAny();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return s.iterator();
+ }
+
+ @Override
+ public Spliterator<T> spliterator() {
+ return s.spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return s.isParallel();
+ }
+
+ @Override
+ public Stream<T> sequential() {
+ return s.sequential();
+ }
+
+ @Override
+ public Stream<T> parallel() {
+ return s.parallel();
+ }
+
+ @Override
+ public Stream<T> unordered() {
+ return s.unordered();
+ }
+
+ @Override
+ public Stream<T> onClose(Runnable closeHandler) {
+ return s.onClose(closeHandler);
+ }
+
+ @Override
+ public void close() {
+ s.close();
+ }
+ }
+
+ static final class DefaultMethodIntStream implements IntStream {
+ final IntStream s;
+
+ public DefaultMethodIntStream(IntStream s) {
+ this.s = s;
+ }
+
+
+ // Delegating non-default methods
+
+ @Override
+ public IntStream filter(IntPredicate predicate) {
+ return s.filter(predicate);
+ }
+
+ @Override
+ public IntStream map(IntUnaryOperator mapper) {
+ return s.map(mapper);
+ }
+
+ @Override
+ public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
+ return s.mapToObj(mapper);
+ }
+
+ @Override
+ public LongStream mapToLong(IntToLongFunction mapper) {
+ return s.mapToLong(mapper);
+ }
+
+ @Override
+ public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
+ return s.mapToDouble(mapper);
+ }
+
+ @Override
+ public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
+ return s.flatMap(mapper);
+ }
+
+ @Override
+ public IntStream distinct() {
+ return s.distinct();
+ }
+
+ @Override
+ public IntStream sorted() {
+ return s.sorted();
+ }
+
+ @Override
+ public IntStream peek(IntConsumer action) {
+ return s.peek(action);
+ }
+
+ @Override
+ public IntStream limit(long maxSize) {
+ return s.limit(maxSize);
+ }
+
+ @Override
+ public IntStream skip(long n) {
+ return s.skip(n);
+ }
+
+ @Override
+ public void forEach(IntConsumer action) {
+ s.forEach(action);
+ }
+
+ @Override
+ public void forEachOrdered(IntConsumer action) {
+ s.forEachOrdered(action);
+ }
+
+ @Override
+ public int[] toArray() {
+ return s.toArray();
+ }
+
+ @Override
+ public int reduce(int identity, IntBinaryOperator op) {
+ return s.reduce(identity, op);
+ }
+
+ @Override
+ public OptionalInt reduce(IntBinaryOperator op) {
+ return s.reduce(op);
+ }
+
+ @Override
+ public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+ return s.collect(supplier, accumulator, combiner);
+ }
+
+ @Override
+ public int sum() {
+ return s.sum();
+ }
+
+ @Override
+ public OptionalInt min() {
+ return s.min();
+ }
+
+ @Override
+ public OptionalInt max() {
+ return s.max();
+ }
+
+ @Override
+ public long count() {
+ return s.count();
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return s.average();
+ }
+
+ @Override
+ public IntSummaryStatistics summaryStatistics() {
+ return s.summaryStatistics();
+ }
+
+ @Override
+ public boolean anyMatch(IntPredicate predicate) {
+ return s.anyMatch(predicate);
+ }
+
+ @Override
+ public boolean allMatch(IntPredicate predicate) {
+ return s.allMatch(predicate);
+ }
+
+ @Override
+ public boolean noneMatch(IntPredicate predicate) {
+ return s.noneMatch(predicate);
+ }
+
+ @Override
+ public OptionalInt findFirst() {
+ return s.findFirst();
+ }
+
+ @Override
+ public OptionalInt findAny() {
+ return s.findAny();
+ }
+
+ @Override
+ public LongStream asLongStream() {
+ return s.asLongStream();
+ }
+
+ @Override
+ public DoubleStream asDoubleStream() {
+ return s.asDoubleStream();
+ }
+
+ @Override
+ public Stream<Integer> boxed() {
+ return s.boxed();
+ }
+
+ @Override
+ public IntStream sequential() {
+ return s.sequential();
+ }
+
+ @Override
+ public IntStream parallel() {
+ return s.parallel();
+ }
+
+ @Override
+ public PrimitiveIterator.OfInt iterator() {
+ return s.iterator();
+ }
+
+ @Override
+ public Spliterator.OfInt spliterator() {
+ return s.spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return s.isParallel();
+ }
+
+ @Override
+ public IntStream unordered() {
+ return s.unordered();
+ }
+
+ @Override
+ public IntStream onClose(Runnable closeHandler) {
+ return s.onClose(closeHandler);
+ }
+
+ @Override
+ public void close() {
+ s.close();
+ }
+ }
+
+ static final class DefaultMethodLongStream implements LongStream {
+ final LongStream s;
+
+ public DefaultMethodLongStream(LongStream s) {
+ this.s = s;
+ }
+
+
+ // Delegating non-default methods
+
+ @Override
+ public void forEach(LongConsumer action) {
+ s.forEach(action);
+ }
+
+ @Override
+ public LongStream filter(LongPredicate predicate) {
+ return s.filter(predicate);
+ }
+
+ @Override
+ public LongStream map(LongUnaryOperator mapper) {
+ return s.map(mapper);
+ }
+
+ @Override
+ public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
+ return s.mapToObj(mapper);
+ }
+
+ @Override
+ public IntStream mapToInt(LongToIntFunction mapper) {
+ return s.mapToInt(mapper);
+ }
+
+ @Override
+ public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
+ return s.mapToDouble(mapper);
+ }
+
+ @Override
+ public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
+ return s.flatMap(mapper);
+ }
+
+ @Override
+ public LongStream distinct() {
+ return s.distinct();
+ }
+
+ @Override
+ public LongStream sorted() {
+ return s.sorted();
+ }
+
+ @Override
+ public LongStream peek(LongConsumer action) {
+ return s.peek(action);
+ }
+
+ @Override
+ public LongStream limit(long maxSize) {
+ return s.limit(maxSize);
+ }
+
+ @Override
+ public LongStream skip(long n) {
+ return s.skip(n);
+ }
+
+ @Override
+ public void forEachOrdered(LongConsumer action) {
+ s.forEachOrdered(action);
+ }
+
+ @Override
+ public long[] toArray() {
+ return s.toArray();
+ }
+
+ @Override
+ public long reduce(long identity, LongBinaryOperator op) {
+ return s.reduce(identity, op);
+ }
+
+ @Override
+ public OptionalLong reduce(LongBinaryOperator op) {
+ return s.reduce(op);
+ }
+
+ @Override
+ public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+ return s.collect(supplier, accumulator, combiner);
+ }
+
+ @Override
+ public long sum() {
+ return s.sum();
+ }
+
+ @Override
+ public OptionalLong min() {
+ return s.min();
+ }
+
+ @Override
+ public OptionalLong max() {
+ return s.max();
+ }
+
+ @Override
+ public long count() {
+ return s.count();
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return s.average();
+ }
+
+ @Override
+ public LongSummaryStatistics summaryStatistics() {
+ return s.summaryStatistics();
+ }
+
+ @Override
+ public boolean anyMatch(LongPredicate predicate) {
+ return s.anyMatch(predicate);
+ }
+
+ @Override
+ public boolean allMatch(LongPredicate predicate) {
+ return s.allMatch(predicate);
+ }
+
+ @Override
+ public boolean noneMatch(LongPredicate predicate) {
+ return s.noneMatch(predicate);
+ }
+
+ @Override
+ public OptionalLong findFirst() {
+ return s.findFirst();
+ }
+
+ @Override
+ public OptionalLong findAny() {
+ return s.findAny();
+ }
+
+ @Override
+ public DoubleStream asDoubleStream() {
+ return s.asDoubleStream();
+ }
+
+ @Override
+ public Stream<Long> boxed() {
+ return s.boxed();
+ }
+
+ @Override
+ public LongStream sequential() {
+ return s.sequential();
+ }
+
+ @Override
+ public LongStream parallel() {
+ return s.parallel();
+ }
+
+ @Override
+ public PrimitiveIterator.OfLong iterator() {
+ return s.iterator();
+ }
+
+ @Override
+ public Spliterator.OfLong spliterator() {
+ return s.spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return s.isParallel();
+ }
+
+ @Override
+ public LongStream unordered() {
+ return s.unordered();
+ }
+
+ @Override
+ public LongStream onClose(Runnable closeHandler) {
+ return s.onClose(closeHandler);
+ }
+
+ @Override
+ public void close() {
+ s.close();
+ }
+ }
+
+ static final class DefaultMethodDoubleStream implements DoubleStream {
+ final DoubleStream s;
+
+ public DefaultMethodDoubleStream(DoubleStream s) {
+ this.s = s;
+ }
+
+ @Override
+ public DoubleStream filter(DoublePredicate predicate) {
+ return s.filter(predicate);
+ }
+
+ @Override
+ public DoubleStream map(DoubleUnaryOperator mapper) {
+ return s.map(mapper);
+ }
+
+ @Override
+ public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
+ return s.mapToObj(mapper);
+ }
+
+ @Override
+ public IntStream mapToInt(DoubleToIntFunction mapper) {
+ return s.mapToInt(mapper);
+ }
+
+ @Override
+ public LongStream mapToLong(DoubleToLongFunction mapper) {
+ return s.mapToLong(mapper);
+ }
+
+ @Override
+ public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
+ return s.flatMap(mapper);
+ }
+
+ @Override
+ public DoubleStream distinct() {
+ return s.distinct();
+ }
+
+ @Override
+ public DoubleStream sorted() {
+ return s.sorted();
+ }
+
+ @Override
+ public DoubleStream peek(DoubleConsumer action) {
+ return s.peek(action);
+ }
+
+ @Override
+ public DoubleStream limit(long maxSize) {
+ return s.limit(maxSize);
+ }
+
+ @Override
+ public DoubleStream skip(long n) {
+ return s.skip(n);
+ }
+
+ @Override
+ public void forEach(DoubleConsumer action) {
+ s.forEach(action);
+ }
+
+ @Override
+ public void forEachOrdered(DoubleConsumer action) {
+ s.forEachOrdered(action);
+ }
+
+ @Override
+ public double[] toArray() {
+ return s.toArray();
+ }
+
+ @Override
+ public double reduce(double identity, DoubleBinaryOperator op) {
+ return s.reduce(identity, op);
+ }
+
+ @Override
+ public OptionalDouble reduce(DoubleBinaryOperator op) {
+ return s.reduce(op);
+ }
+
+ @Override
+ public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+ return s.collect(supplier, accumulator, combiner);
+ }
+
+ @Override
+ public double sum() {
+ return s.sum();
+ }
+
+ @Override
+ public OptionalDouble min() {
+ return s.min();
+ }
+
+ @Override
+ public OptionalDouble max() {
+ return s.max();
+ }
+
+ @Override
+ public long count() {
+ return s.count();
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return s.average();
+ }
+
+ @Override
+ public DoubleSummaryStatistics summaryStatistics() {
+ return s.summaryStatistics();
+ }
+
+ @Override
+ public boolean anyMatch(DoublePredicate predicate) {
+ return s.anyMatch(predicate);
+ }
+
+ @Override
+ public boolean allMatch(DoublePredicate predicate) {
+ return s.allMatch(predicate);
+ }
+
+ @Override
+ public boolean noneMatch(DoublePredicate predicate) {
+ return s.noneMatch(predicate);
+ }
+
+ @Override
+ public OptionalDouble findFirst() {
+ return s.findFirst();
+ }
+
+ @Override
+ public OptionalDouble findAny() {
+ return s.findAny();
+ }
+
+ @Override
+ public Stream<Double> boxed() {
+ return s.boxed();
+ }
+
+ @Override
+ public DoubleStream sequential() {
+ return s.sequential();
+ }
+
+ @Override
+ public DoubleStream parallel() {
+ return s.parallel();
+ }
+
+ @Override
+ public PrimitiveIterator.OfDouble iterator() {
+ return s.iterator();
+ }
+
+ @Override
+ public Spliterator.OfDouble spliterator() {
+ return s.spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return s.isParallel();
+ }
+
+ @Override
+ public DoubleStream unordered() {
+ return s.unordered();
+ }
+
+ @Override
+ public DoubleStream onClose(Runnable closeHandler) {
+ return s.onClose(closeHandler);
+ }
+
+ @Override
+ public void close() {
+ s.close();
+ }
+ }
+}
\ No newline at end of file
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java Mon Jul 13 17:44:34 2015 +0800
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java Tue Jun 09 07:10:03 2015 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -119,7 +119,7 @@
// Simple combination of numbers and null values, probably excessive but may catch
// errors for initialization/termination/sequence
- // @@@ This is separate from the other data for now until nulls are consitently supported by
+ // @@@ This is separate from the other data for now until nulls are consistently supported by
// all operations
{
List<Object[]> list = new ArrayList<>();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpStatefulTest.java Tue Jun 09 07:10:03 2015 +0100
@@ -0,0 +1,304 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.DefaultMethodStreams;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toCollection;
+
+/*
+ * @test
+ * @bug 8071597
+ */
+@Test
+public class WhileOpStatefulTest extends OpTestCase {
+ static final long COUNT_PERIOD = 100;
+
+ static final long EXECUTION_TIME_LIMIT = TimeUnit.SECONDS.toMillis(10);
+
+ static final long TAKE_WHILE_COUNT_LIMIT = 100_000;
+
+ static final int DROP_SOURCE_SIZE = 10_000;
+
+ static final long DROP_WHILE_COUNT_LIMIT = 5000;
+
+ @Test
+ public void testTimedTakeWithCount() {
+ testTakeWhileMulti(
+ s -> {
+ BooleanSupplier isWithinTakePeriod =
+ within(System.currentTimeMillis(), COUNT_PERIOD);
+ s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ },
+ s -> {
+ BooleanSupplier isWithinTakePeriod =
+ within(System.currentTimeMillis(), COUNT_PERIOD);
+ s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ },
+ s -> {
+ BooleanSupplier isWithinTakePeriod =
+ within(System.currentTimeMillis(), COUNT_PERIOD);
+ s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
+ .map(e -> 1).reduce(0, Long::sum);
+ },
+ s -> {
+ BooleanSupplier isWithinTakePeriod =
+ within(System.currentTimeMillis(), COUNT_PERIOD);
+ s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ });
+ }
+
+ @Test
+ public void testCountTakeWithCount() {
+ testTakeWhileMulti(
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(rc <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(rc <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .map(e -> 1).reduce(0, Long::sum);
+ assertTrue(rc <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(rc <= c.get());
+ });
+ }
+
+ @Test
+ public void testCountTakeWithToArray() {
+ testTakeWhileMulti(
+ s -> {
+ AtomicLong c = new AtomicLong();
+ Object[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(ra.length <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ int[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(ra.length <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(ra.length <= c.get());
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ double[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(ra.length <= c.get());
+ });
+ }
+
+
+ @Test
+ public void testCountDropWithCount() {
+ testDropWhileMulti(
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(rc <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(rc <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .map(e -> 1).reduce(0, Long::sum);
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(rc <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .mapToLong(e -> 1).reduce(0, Long::sum);
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(rc <= DROP_SOURCE_SIZE);
+ });
+ }
+
+ @Test
+ public void testCountDropWithToArray() {
+ testDropWhileMulti(
+ s -> {
+ AtomicLong c = new AtomicLong();
+ Object[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(ra.length <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ int[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(ra.length <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ long[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(ra.length <= DROP_SOURCE_SIZE);
+ },
+ s -> {
+ AtomicLong c = new AtomicLong();
+ double[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
+ .toArray();
+ assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
+ assertTrue(ra.length <= DROP_SOURCE_SIZE);
+ });
+ }
+
+
+ private void testTakeWhileMulti(Consumer<Stream<Integer>> mRef,
+ Consumer<IntStream> mInt,
+ Consumer<LongStream> mLong,
+ Consumer<DoubleStream> mDouble) {
+ Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
+ sources.put("Stream.generate()", () -> Stream.generate(() -> 1));
+ sources.put("Stream.iterate()", () -> Stream.iterate(1, x -> 1));
+ sources.put("Stream.iterate().unordered()", () -> Stream.iterate(1, x -> 1));
+ testWhileMulti(sources, mRef, mInt, mLong, mDouble);
+ }
+
+ private void testDropWhileMulti(Consumer<Stream<Integer>> mRef,
+ Consumer<IntStream> mInt,
+ Consumer<LongStream> mLong,
+ Consumer<DoubleStream> mDouble) {
+ Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
+ sources.put("IntStream.range().boxed()",
+ () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed());
+ sources.put("IntStream.range().boxed().unordered()",
+ () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed().unordered());
+ sources.put("LinkedList.stream()",
+ () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
+ .collect(toCollection(LinkedList::new))
+ .stream());
+ sources.put("LinkedList.stream().unordered()",
+ () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
+ .collect(toCollection(LinkedList::new))
+ .stream()
+ .unordered());
+ testWhileMulti(sources, mRef, mInt, mLong, mDouble);
+ }
+
+ private void testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources,
+ Consumer<Stream<Integer>> mRef,
+ Consumer<IntStream> mInt,
+ Consumer<LongStream> mLong,
+ Consumer<DoubleStream> mDouble) {
+ Map<String, Function<Stream<Integer>, Stream<Integer>>> transforms = new HashMap<>();
+ transforms.put("Stream.sequential()", s -> {
+ BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
+ EXECUTION_TIME_LIMIT);
+ return s.peek(e -> {
+ if (!isWithinExecutionPeriod.getAsBoolean()) {
+ throw new RuntimeException();
+ }
+ });
+ });
+ transforms.put("Stream.parallel()", s -> {
+ BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
+ EXECUTION_TIME_LIMIT);
+ return s.parallel()
+ .peek(e -> {
+ if (!isWithinExecutionPeriod.getAsBoolean()) {
+ throw new RuntimeException();
+ }
+ });
+ });
+
+ Map<String, Consumer<Stream<Integer>>> actions = new HashMap<>();
+ actions.put("Ref", mRef);
+ actions.put("Int", s -> mInt.accept(s.mapToInt(e -> e)));
+ actions.put("Long", s -> mLong.accept(s.mapToLong(e -> e)));
+ actions.put("Double", s -> mDouble.accept(s.mapToDouble(e -> e)));
+ actions.put("Ref using defaults", s -> mRef.accept(DefaultMethodStreams.delegateTo(s)));
+ actions.put("Int using defaults", s -> mInt.accept(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))));
+ actions.put("Long using defaults", s -> mLong.accept(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))));
+ actions.put("Double using defaults", s -> mDouble.accept(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))));
+
+ for (Map.Entry<String, Supplier<Stream<Integer>>> s : sources.entrySet()) {
+ setContext("source", s.getKey());
+
+ for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> t : transforms.entrySet()) {
+ setContext("transform", t.getKey());
+
+ for (Map.Entry<String, Consumer<Stream<Integer>>> a : actions.entrySet()) {
+ setContext("shape", a.getKey());
+
+ Stream<Integer> stream = s.getValue().get();
+ stream = t.getValue().apply(stream);
+ a.getValue().accept(stream);
+ }
+ }
+ }
+ }
+
+ static BooleanSupplier within(long start, long durationInMillis) {
+ return () -> (System.currentTimeMillis() - start) < durationInMillis;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java Tue Jun 09 07:10:03 2015 +0100
@@ -0,0 +1,361 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.DefaultMethodStreams;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LambdaTestHelpers;
+import java.util.stream.LongStream;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.StreamTestDataProvider;
+import java.util.stream.TestData;
+
+/*
+ * @test
+ * @bug 8071597
+ */
+@Test
+public class WhileOpTest extends OpTestCase {
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testTakeWhileOps(String name, TestData.OfRef<Integer> data) {
+ for (int size : sizes(data.size())) {
+ setContext("takeWhile", size);
+
+ testWhileMulti(data,
+ whileResultAsserter(data, WhileOp.Take, e -> e < size),
+ s -> s.takeWhile(e -> e < size),
+ s -> s.takeWhile(e -> e < size),
+ s -> s.takeWhile(e -> e < size),
+ s -> s.takeWhile(e -> e < size));
+
+
+ testWhileMulti(data,
+ whileResultAsserter(data, WhileOp.Take, e -> e < size / 2),
+ s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
+ s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
+ s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
+ s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2));
+ }
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testDropWhileOps(String name, TestData.OfRef<Integer> data) {
+ for (int size : sizes(data.size())) {
+ setContext("dropWhile", size);
+
+ testWhileMulti(data,
+ whileResultAsserter(data, WhileOp.Drop, e -> e < size),
+ s -> s.dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size));
+
+ testWhileMulti(data,
+ whileResultAsserter(data, WhileOp.Drop, e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size));
+ }
+ }
+
+ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+ public void testDropTakeWhileOps(String name, TestData.OfRef<Integer> data) {
+ for (int size : sizes(data.size())) {
+ setContext("dropWhile", size);
+
+ testWhileMulti(data,
+ whileResultAsserter(data, WhileOp.Undefined, null),
+ s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
+ s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size));
+ }
+ }
+
+ /**
+ * While operation type to be asserted on
+ */
+ enum WhileOp {
+ /**
+ * The takeWhile operation
+ */
+ Take,
+ /**
+ * The dropWhile operation
+ */
+ Drop,
+ /**
+ * The operation(s) are undefined
+ */
+ Undefined
+ }
+
+ /**
+ * Create a result asserter for takeWhile or dropWhile operations.
+ * <p>
+ * If the stream pipeline consists of the takeWhile operation
+ * ({@link WhileOp#Take}) or the dropWhile operation ({@link WhileOp#Drop})
+ * then specific assertions can be made on the actual result based on the
+ * input elements, {@code inputData}, and whether those elements match the
+ * predicate, {@code p}, of the operation.
+ * <p>
+ * If the input elements have an encounter order then the actual result
+ * is asserted against the result of operating sequentially on input
+ * elements given the predicate and in accordance with the operation
+ * semantics. (The actual result whether produced sequentially or in
+ * parallel should the same.)
+ * <p>
+ * If the input elements have no encounter order then an actual result
+ * is, for practical purposes, considered non-deterministic.
+ * Consider an input list of lists that contains all possible permutations
+ * of the input elements, and a output list of lists that is the result of
+ * applying the pipeline with the operation sequentially to each input
+ * list.
+ * Any list in the output lists is a valid result. It's not practical to
+ * test in such a manner.
+ * For a takeWhile operation the following assertions can be made if
+ * only some of the input elements match the predicate (i.e. taking will
+ * short-circuit the pipeline):
+ * <ol>
+ * <li>The set of output elements is a subset of the set of matching
+ * input elements</li>
+ * <li>The set of output elements and the set of non-matching input
+ * element are disjoint</li>
+ * </ol>
+ * For a dropWhile operation the following assertions can be made:
+ * <ol>
+ * <li>The set of non-matching input elements is a subset of the set of
+ * output elements</li>
+ * <li>The set of matching output elements is a subset of the set of
+ * matching input elements</li>
+ * </ol>
+ *
+ * @param inputData the elements input into the stream pipeline
+ * @param op the operation of the stream pipeline, one of takeWhile,
+ * dropWhile, or an undefined set of operations (possibly including
+ * two or more takeWhile and/or dropWhile operations, or because
+ * the predicate is not stateless).
+ * @param p the stateless predicate applied to the operation, ignored if
+ * the
+ * operation is {@link WhileOp#Undefined}.
+ * @param <T> the type of elements
+ * @return a result asserter
+ */
+ private <T> ResultAsserter<Iterable<T>> whileResultAsserter(Iterable<T> inputData,
+ WhileOp op,
+ Predicate<? super T> p) {
+ return (act, exp, ord, par) -> {
+ if (par & !ord) {
+ List<T> input = new ArrayList<>();
+ inputData.forEach(input::add);
+
+ List<T> output = new ArrayList<>();
+ act.forEach(output::add);
+
+ if (op == WhileOp.Take) {
+ List<T> matchingInput = new ArrayList<>();
+ List<T> nonMatchingInput = new ArrayList<>();
+ input.forEach(t -> {
+ if (p.test(t))
+ matchingInput.add(t);
+ else
+ nonMatchingInput.add(t);
+ });
+
+ // If some, not all, elements are taken
+ if (matchingInput.size() < input.size()) {
+ assertTrue(output.size() <= matchingInput.size(),
+ "Output is larger than the matching input");
+
+ // The output must be a subset of the matching input
+ assertTrue(matchingInput.containsAll(output),
+ "Output is not a subset of the matching input");
+
+ // The output must not contain any non matching elements
+ for (T nonMatching : nonMatchingInput) {
+ assertFalse(output.contains(nonMatching),
+ "Output and non-matching input are not disjoint");
+ }
+ }
+ }
+ else if (op == WhileOp.Drop) {
+ List<T> matchingInput = new ArrayList<>();
+ List<T> nonMatchingInput = new ArrayList<>();
+ input.forEach(t -> {
+ if (p.test(t))
+ matchingInput.add(t);
+ else
+ nonMatchingInput.add(t);
+ });
+
+ // The non matching input must be a subset of output
+ assertTrue(output.containsAll(nonMatchingInput),
+ "Non-matching input is not a subset of the output");
+
+ // The matching output must be a subset of the matching input
+ List<T> matchingOutput = new ArrayList<>();
+ output.forEach(i -> {
+ if (p.test(i))
+ matchingOutput.add(i);
+ });
+ assertTrue(matchingInput.containsAll(matchingOutput),
+ "Matching output is not a subset of matching input");
+ }
+
+ // Note: if there is a combination of takeWhile and dropWhile then specific
+ // assertions cannot be performed.
+ // All that can be reliably asserted is the output is a subset of the input
+
+ assertTrue(input.containsAll(output));
+ }
+ else {
+ // For specific operations derive expected result from the input
+ if (op == WhileOp.Take) {
+ List<T> takeInput = new ArrayList<>();
+ for (T t : inputData) {
+ if (p.test(t))
+ takeInput.add(t);
+ else
+ break;
+ }
+
+ LambdaTestHelpers.assertContents(act, takeInput);
+ }
+ else if (op == WhileOp.Drop) {
+ List<T> dropInput = new ArrayList<>();
+ for (T t : inputData) {
+ if (dropInput.size() > 0 || !p.test(t))
+ dropInput.add(t);
+ }
+
+ LambdaTestHelpers.assertContents(act, dropInput);
+ }
+
+ LambdaTestHelpers.assertContents(act, exp);
+ }
+ };
+ }
+
+ private Collection<Integer> sizes(int s) {
+ Set<Integer> sizes = new LinkedHashSet<>();
+
+ sizes.add(0);
+ sizes.add(1);
+ sizes.add(s / 4);
+ sizes.add(s / 2);
+ sizes.add(3 * s / 4);
+ sizes.add(Math.max(0, s - 1));
+ sizes.add(s);
+ sizes.add(Integer.MAX_VALUE);
+
+ return sizes;
+ }
+
+ private void testWhileMulti(TestData.OfRef<Integer> data,
+ ResultAsserter<Iterable<Integer>> ra,
+ Function<Stream<Integer>, Stream<Integer>> mRef,
+ Function<IntStream, IntStream> mInt,
+ Function<LongStream, LongStream> mLong,
+ Function<DoubleStream, DoubleStream> mDouble) {
+ Map<String, Function<Stream<Integer>, Stream<Integer>>> ms = new HashMap<>();
+ ms.put("Ref", mRef);
+ ms.put("Int", s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
+ ms.put("Long", s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
+ ms.put("Double", s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
+ ms.put("Ref using defaults", s -> mRef.apply(DefaultMethodStreams.delegateTo(s)));
+ ms.put("Int using defaults", s -> mInt.apply(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))).mapToObj(e -> e));
+ ms.put("Long using defaults", s -> mLong.apply(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))).mapToObj(e -> (int) e));
+ ms.put("Double using defaults", s -> mDouble.apply(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))).mapToObj(e -> (int) e));
+
+ testWhileMulti(data, ra, ms);
+ }
+
+ private final void testWhileMulti(TestData.OfRef<Integer> data,
+ ResultAsserter<Iterable<Integer>> ra,
+ Map<String, Function<Stream<Integer>, Stream<Integer>>> ms) {
+ for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> e : ms.entrySet()) {
+ setContext("shape", e.getKey());
+
+ withData(data)
+ .stream(e.getValue())
+ .resultAsserter(ra)
+ .exercise();
+ }
+ }
+
+ @Test
+ public void testRefDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ Stream<Integer> s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (Stream<Integer> ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testIntDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testLongDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testDoubleDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+}