8008670: Initial java.util.stream putback -- internal API classes
Reviewed-by: mduigou, dholmes
Contributed-by: Brian Goetz <brian.goetz@oracle.com>, Doug Lea <dl@cs.oswego.edu>, Paul Sandoz <paul.sandoz@oracle.com>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Abstract class for fork-join tasks used to implement short-circuiting
+ * stream ops, which can produce a result without processing all elements of the
+ * stream.
+ *
+ * @param <P_IN> type of input elements to the pipeline
+ * @param <P_OUT> type of output elements from the pipeline
+ * @param <R> type of intermediate result, may be different from operation
+ * result type
+ * @param <K> type of child and sibling tasks
+ * @since 1.8
+ */
+abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
+ K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
+ extends AbstractTask<P_IN, P_OUT, R, K> {
+ /**
+ * The result for this computation; this is shared among all tasks and set
+ * exactly once
+ */
+ protected final AtomicReference<R> sharedResult;
+
+ /**
+ * Indicates whether this task has been canceled. Tasks may cancel other
+ * tasks in the computation under various conditions, such as in a
+ * find-first operation, a task that finds a value will cancel all tasks
+ * that are later in the encounter order.
+ */
+ protected volatile boolean canceled;
+
+ /**
+ * Constructor for root tasks.
+ *
+ * @param helper the {@code PipelineHelper} describing the stream pipeline
+ * up to this operation
+ * @param spliterator the {@code Spliterator} describing the source for this
+ * pipeline
+ */
+ protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator) {
+ super(helper, spliterator);
+ sharedResult = new AtomicReference<>(null);
+ }
+
+ /**
+ * Constructor for non-root nodes.
+ *
+ * @param parent parent task in the computation tree
+ * @param spliterator the {@code Spliterator} for the portion of the
+ * computation tree described by this task
+ */
+ protected AbstractShortCircuitTask(K parent,
+ Spliterator<P_IN> spliterator) {
+ super(parent, spliterator);
+ sharedResult = parent.sharedResult;
+ }
+
+ /**
+ * Returns the value indicating the computation completed with no task
+ * finding a short-circuitable result. For example, for a "find" operation,
+ * this might be null or an empty {@code Optional}.
+ *
+ * @return the result to return when no task finds a result
+ */
+ protected abstract R getEmptyResult();
+
+ @Override
+ protected boolean canCompute() {
+ // Have we already found an answer?
+ if (sharedResult.get() != null) {
+ tryComplete();
+ return false;
+ } else if (taskCanceled()) {
+ setLocalResult(getEmptyResult());
+ tryComplete();
+ return false;
+ }
+ else {
+ return true;
+ }
+ }
+
+ /**
+ * Declares that a globally valid result has been found. If another task has
+ * not already found the answer, the result is installed in
+ * {@code sharedResult}. The {@code compute()} method will check
+ * {@code sharedResult} before proceeding with computation, so this causes
+ * the computation to terminate early.
+ *
+ * @param result the result found
+ */
+ protected void shortCircuit(R result) {
+ if (result != null)
+ sharedResult.compareAndSet(null, result);
+ }
+
+ /**
+ * Sets a local result for this task. If this task is the root, set the
+ * shared result instead (if not already set).
+ *
+ * @param localResult The result to set for this task
+ */
+ @Override
+ protected void setLocalResult(R localResult) {
+ if (isRoot()) {
+ if (localResult != null)
+ sharedResult.compareAndSet(null, localResult);
+ }
+ else
+ super.setLocalResult(localResult);
+ }
+
+ /**
+ * Retrieves the local result for this task
+ */
+ @Override
+ public R getRawResult() {
+ return getLocalResult();
+ }
+
+ /**
+ * Retrieves the local result for this task. If this task is the root,
+ * retrieves the shared result instead.
+ */
+ @Override
+ public R getLocalResult() {
+ if (isRoot()) {
+ R answer = sharedResult.get();
+ return (answer == null) ? getEmptyResult() : answer;
+ }
+ else
+ return super.getLocalResult();
+ }
+
+ /**
+ * Mark this task as canceled
+ */
+ protected void cancel() {
+ canceled = true;
+ }
+
+ /**
+ * Queries whether this task is canceled. A task is considered canceled if
+ * it or any of its parents have been canceled.
+ *
+ * @return {@code true} if this task or any parent is canceled.
+ */
+ protected boolean taskCanceled() {
+ boolean cancel = canceled;
+ if (!cancel) {
+ for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
+ cancel = parent.canceled;
+ }
+
+ return cancel;
+ }
+
+ /**
+ * Cancels all tasks which succeed this one in the encounter order. This
+ * includes canceling all the current task's right sibling, as well as the
+ * later right siblings of all its parents.
+ */
+ protected void cancelLaterNodes() {
+ // Go up the tree, cancel right siblings of this node and all parents
+ for (K parent = getParent(), node = (K) this; parent != null;
+ node = parent, parent = parent.getParent()) {
+ // If node is a left child of parent, then has a right sibling
+ if (parent.leftChild == node) {
+ K rightSibling = parent.rightChild;
+ if (!rightSibling.canceled)
+ rightSibling.cancel();
+ }
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/AbstractTask.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,373 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Abstract base class for most fork-join tasks used to implement stream ops.
+ * Manages splitting logic, tracking of child tasks, and intermediate results.
+ * Each task is associated with a {@link Spliterator} that describes the portion
+ * of the input associated with the subtree rooted at this task.
+ * Tasks may be leaf nodes (which will traverse the elements of
+ * the {@code Spliterator}) or internal nodes (which split the
+ * {@code Spliterator} into multiple child tasks).
+ *
+ * @implNote
+ * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
+ * where each task has a semaphore-like count of uncompleted children, and the
+ * task is implicitly completed and notified when its last child completes.
+ * Internal node tasks will likely override the {@code onCompletion} method from
+ * {@code CountedCompleter} to merge the results from child tasks into the
+ * current task's result.
+ *
+ * <p>Splitting and setting up the child task links is done by {@code compute()}
+ * for internal nodes. At {@code compute()} time for leaf nodes, it is
+ * guaranteed that the parent's child-related fields (including sibling links
+ * for the parent's children) will be set up for all children.
+ *
+ * <p>For example, a task that performs a reduce would override {@code doLeaf()}
+ * to perform a reduction on that leaf node's chunk using the
+ * {@code Spliterator}, and override {@code onCompletion()} to merge the results
+ * of the child tasks for internal nodes:
+ *
+ * <pre>{@code
+ * protected S doLeaf() {
+ * spliterator.forEach(...);
+ * return localReductionResult;
+ * }
+ *
+ * public void onCompletion(CountedCompleter caller) {
+ * if (!isLeaf()) {
+ * ReduceTask<P_IN, P_OUT, T, R> child = children;
+ * R result = child.getLocalResult();
+ * child = child.nextSibling;
+ * for (; child != null; child = child.nextSibling)
+ * result = combine(result, child.getLocalResult());
+ * setLocalResult(result);
+ * }
+ * }
+ * }</pre>
+ *
+ * @param <P_IN> Type of elements input to the pipeline
+ * @param <P_OUT> Type of elements output from the pipeline
+ * @param <R> Type of intermediate result, which may be different from operation
+ * result type
+ * @param <K> Type of parent, child and sibling tasks
+ * @since 1.8
+ */
+abstract class AbstractTask<P_IN, P_OUT, R,
+ K extends AbstractTask<P_IN, P_OUT, R, K>>
+ extends CountedCompleter<R> {
+
+ /**
+ * Default target factor of leaf tasks for parallel decomposition.
+ * To allow load balancing, we over-partition, currently to approximately
+ * four tasks per processor, which enables others to help out
+ * if leaf tasks are uneven or some processors are otherwise busy.
+ */
+ static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
+
+ /** The pipeline helper, common to all tasks in a computation */
+ protected final PipelineHelper<P_OUT> helper;
+
+ /**
+ * The spliterator for the portion of the input associated with the subtree
+ * rooted at this task
+ */
+ protected Spliterator<P_IN> spliterator;
+
+ /** Target leaf size, common to all tasks in a computation */
+ protected final long targetSize;
+
+ /**
+ * The left child.
+ * null if no children
+ * if non-null rightChild is non-null
+ */
+ protected K leftChild;
+
+ /**
+ * The right child.
+ * null if no children
+ * if non-null leftChild is non-null
+ */
+ protected K rightChild;
+
+ /** The result of this node, if completed */
+ private R localResult;
+
+ /**
+ * Constructor for root nodes.
+ *
+ * @param helper The {@code PipelineHelper} describing the stream pipeline
+ * up to this operation
+ * @param spliterator The {@code Spliterator} describing the source for this
+ * pipeline
+ */
+ protected AbstractTask(PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator) {
+ super(null);
+ this.helper = helper;
+ this.spliterator = spliterator;
+ this.targetSize = suggestTargetSize(spliterator.estimateSize());
+ }
+
+ /**
+ * Constructor for non-root nodes.
+ *
+ * @param parent this node's parent task
+ * @param spliterator {@code Spliterator} describing the subtree rooted at
+ * this node, obtained by splitting the parent {@code Spliterator}
+ */
+ protected AbstractTask(K parent,
+ Spliterator<P_IN> spliterator) {
+ super(parent);
+ this.spliterator = spliterator;
+ this.helper = parent.helper;
+ this.targetSize = parent.targetSize;
+ }
+
+ /**
+ * Constructs a new node of type T whose parent is the receiver; must call
+ * the AbstractTask(T, Spliterator) constructor with the receiver and the
+ * provided Spliterator.
+ *
+ * @param spliterator {@code Spliterator} describing the subtree rooted at
+ * this node, obtained by splitting the parent {@code Spliterator}
+ * @return newly constructed child node
+ */
+ protected abstract K makeChild(Spliterator<P_IN> spliterator);
+
+ /**
+ * Computes the result associated with a leaf node. Will be called by
+ * {@code compute()} and the result passed to @{code setLocalResult()}
+ *
+ * @return the computed result of a leaf node
+ */
+ protected abstract R doLeaf();
+
+ /**
+ * Returns a suggested target leaf size based on the initial size estimate.
+ *
+ * @return suggested target leaf size
+ */
+ public static long suggestTargetSize(long sizeEstimate) {
+ long est = sizeEstimate / LEAF_TARGET;
+ return est > 0L ? est : 1L;
+ }
+
+ /**
+ * Returns a suggestion whether it is advisable to split the provided
+ * spliterator based on target size and other considerations, such as pool
+ * state.
+ *
+ * @return {@code true} if a split is advised otherwise {@code false}
+ */
+ public static boolean suggestSplit(Spliterator spliterator,
+ long targetSize) {
+ long remaining = spliterator.estimateSize();
+ return (remaining > targetSize);
+ // @@@ May additionally want to fold in pool characteristics such as surplus task count
+ }
+
+ /**
+ * Returns a suggestion whether it is adviseable to split this task based on
+ * target size and other considerations.
+ *
+ * @return {@code true} if a split is advised otherwise {@code false}
+ */
+ public boolean suggestSplit() {
+ return suggestSplit(spliterator, targetSize);
+ }
+
+ /**
+ * Returns the local result, if any. Subclasses should use
+ * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
+ * results. This returns the local result so that calls from within the
+ * fork-join framework will return the correct result.
+ *
+ * @return local result for this node previously stored with
+ * {@link #setLocalResult}
+ */
+ @Override
+ public R getRawResult() {
+ return localResult;
+ }
+
+ /**
+ * Does nothing; instead, subclasses should use
+ * {@link #setLocalResult(Object)}} to manage results.
+ *
+ * @param result must be null, or an exception is thrown (this is a safety
+ * tripwire to detect when {@code setRawResult()} is being used
+ * instead of {@code setLocalResult()}
+ */
+ @Override
+ protected void setRawResult(R result) {
+ if (result != null)
+ throw new IllegalStateException();
+ }
+
+ /**
+ * Retrieves a result previously stored with {@link #setLocalResult}
+ *
+ * @return local result for this node previously stored with
+ * {@link #setLocalResult}
+ */
+ protected R getLocalResult() {
+ return localResult;
+ }
+
+ /**
+ * Associates the result with the task, can be retrieved with
+ * {@link #getLocalResult}
+ *
+ * @param localResult local result for this node
+ */
+ protected void setLocalResult(R localResult) {
+ this.localResult = localResult;
+ }
+
+ /**
+ * Indicates whether this task is a leaf node. (Only valid after
+ * {@link #compute} has been called on this node). If the node is not a
+ * leaf node, then children will be non-null and numChildren will be
+ * positive.
+ *
+ * @return {@code true} if this task is a leaf node
+ */
+ protected boolean isLeaf() {
+ return leftChild == null;
+ }
+
+ /**
+ * Indicates whether this task is the root node
+ *
+ * @return {@code true} if this task is the root node.
+ */
+ protected boolean isRoot() {
+ return getParent() == null;
+ }
+
+ /**
+ * Returns the parent of this task, or null if this task is the root
+ *
+ * @return the parent of this task, or null if this task is the root
+ */
+ @SuppressWarnings("unchecked")
+ protected K getParent() {
+ return (K) getCompleter();
+ }
+
+ /**
+ * Decides whether or not to split a task further or compute it directly. If
+ * computing directly, call {@code doLeaf} and pass the result to
+ * {@code setRawResult}. If splitting, set up the child-related fields,
+ * create the child tasks, fork the leftmost (prefix) child tasks, and
+ * compute the rightmost (remaining) child tasks.
+ *
+ * <p>
+ * Computing will continue for rightmost tasks while a task can be computed
+ * as determined by {@link #canCompute()} and that task should and can be
+ * split into left and right tasks.
+ *
+ * <p>
+ * The rightmost tasks are computed in a loop rather than recursively to
+ * avoid potential stack overflows when computing with a right-balanced
+ * tree, such as that produced when splitting with a {@link Spliterator}
+ * created from an {@link java.util.Iterator}.
+ */
+ @Override
+ public final void compute() {
+ @SuppressWarnings("unchecked")
+ K task = (K) this;
+ while (task.canCompute()) {
+ Spliterator<P_IN> split;
+ if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
+ task.setLocalResult(task.doLeaf());
+ task.tryComplete();
+ return;
+ }
+ else {
+ K l = task.leftChild = task.makeChild(split);
+ K r = task.rightChild = task.makeChild(task.spliterator);
+ task.setPendingCount(1);
+ l.fork();
+ task = r;
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implNote
+ * Clears spliterator and children fields. Overriders MUST call
+ * {@code super.onCompletion} as the last thing they do if they want these
+ * cleared.
+ */
+ @Override
+ public void onCompletion(CountedCompleter<?> caller) {
+ spliterator = null;
+ leftChild = rightChild = null;
+ }
+
+ /**
+ * Determines if the task can be computed.
+ *
+ * @implSpec The default always returns true
+ *
+ * @return {@code true} if this task can be computed to either calculate the
+ * leaf via {@link #doLeaf()} or split, otherwise false if this task
+ * cannot be computed, for example if this task has been canceled
+ * and/or a result for the computation has been found by another
+ * task.
+ */
+ protected boolean canCompute() {
+ return true;
+ }
+
+ /**
+ * Returns whether this node is a "leftmost" node -- whether the path from
+ * the root to this node involves only traversing leftmost child links. For
+ * a leaf node, this means it is the first leaf node in the encounter order.
+ *
+ * @return {@code true} if this node is a "leftmost" node
+ */
+ protected boolean isLeftmostNode() {
+ @SuppressWarnings("unchecked")
+ K node = (K) this;
+ while (node != null) {
+ K parent = node.getParent();
+ if (parent != null && parent.leftChild != node)
+ return false;
+ node = parent;
+ }
+ return true;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/FindOps.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,317 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+/**
+ * Factory for instances of a short-circuiting {@code TerminalOp} that searches
+ * for an element in a stream pipeline, and terminates when it finds one.
+ * Supported variants include find-first (find the first element in the
+ * encounter order) and find-any (find any element, may not be the first in
+ * encounter order.)
+ *
+ * @since 1.8
+ */
+final class FindOps {
+
+ private FindOps() { }
+
+ /**
+ * Constructs a {@code TerminalOp} for streams of objects.
+ *
+ * @param <T> the type of elements of the stream
+ * @param mustFindFirst whether the {@code TerminalOp} must produce the
+ * first element in the encounter order
+ * @return a {@code TerminalOp} implementing the find operation
+ */
+ public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
+ return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
+ Optional::isPresent, FindSink.OfRef::new);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} for streams of ints.
+ *
+ * @param mustFindFirst whether the {@code TerminalOp} must produce the
+ * first element in the encounter order
+ * @return a {@code TerminalOp} implementing the find operation
+ */
+ public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
+ return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
+ OptionalInt::isPresent, FindSink.OfInt::new);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} for streams of longs.
+ *
+ * @param mustFindFirst whether the {@code TerminalOp} must produce the
+ * first element in the encounter order
+ * @return a {@code TerminalOp} implementing the find operation
+ */
+ public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
+ return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
+ OptionalLong::isPresent, FindSink.OfLong::new);
+ }
+
+ /**
+ * Constructs a {@code FindOp} for streams of doubles.
+ *
+ * @param mustFindFirst whether the {@code TerminalOp} must produce the
+ * first element in the encounter order
+ * @return a {@code TerminalOp} implementing the find operation
+ */
+ public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
+ return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
+ OptionalDouble::isPresent, FindSink.OfDouble::new);
+ }
+
+ /**
+ * A short-circuiting {@code TerminalOp} that searches for an element in a
+ * stream pipeline, and terminates when it finds one. Implements both
+ * find-first (find the first element in the encounter order) and find-any
+ * (find any element, may not be the first in encounter order.)
+ *
+ * @param <T> the output type of the stream pipeline
+ * @param <O> the result type of the find operation, typically an optional
+ * type
+ */
+ private static final class FindOp<T, O> implements TerminalOp<T, O> {
+ private final StreamShape shape;
+ final boolean mustFindFirst;
+ final O emptyValue;
+ final Predicate<O> presentPredicate;
+ final Supplier<TerminalSink<T, O>> sinkSupplier;
+
+ /**
+ * Constructs a {@code FindOp}.
+ *
+ * @param mustFindFirst if true, must find the first element in
+ * encounter order, otherwise can find any element
+ * @param shape stream shape of elements to search
+ * @param emptyValue result value corresponding to "found nothing"
+ * @param presentPredicate {@code Predicate} on result value
+ * corresponding to "found something"
+ * @param sinkSupplier supplier for a {@code TerminalSink} implementing
+ * the matching functionality
+ */
+ FindOp(boolean mustFindFirst,
+ StreamShape shape,
+ O emptyValue,
+ Predicate<O> presentPredicate,
+ Supplier<TerminalSink<T, O>> sinkSupplier) {
+ this.mustFindFirst = mustFindFirst;
+ this.shape = shape;
+ this.emptyValue = emptyValue;
+ this.presentPredicate = presentPredicate;
+ this.sinkSupplier = sinkSupplier;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return shape;
+ }
+
+ @Override
+ public <S> O evaluateSequential(PipelineHelper<T> helper,
+ Spliterator<S> spliterator) {
+ O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
+ return result != null ? result : emptyValue;
+ }
+
+ @Override
+ public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator) {
+ return new FindTask<>(this, helper, spliterator).invoke();
+ }
+ }
+
+ /**
+ * Implementation of @{code TerminalSink} that implements the find
+ * functionality, requesting cancellation when something has been found
+ *
+ * @param <T> The type of input element
+ * @param <O> The result type, typically an optional type
+ */
+ private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
+ boolean hasValue;
+ T value;
+
+ FindSink() {} // Avoid creation of special accessor
+
+ @Override
+ public void accept(T value) {
+ if (!hasValue) {
+ hasValue = true;
+ this.value = value;
+ }
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return hasValue;
+ }
+
+ /** Specialization of {@code FindSink} for reference streams */
+ static final class OfRef<T> extends FindSink<T, Optional<T>> {
+ @Override
+ public Optional<T> get() {
+ return hasValue ? Optional.of(value) : null;
+ }
+ }
+
+ /** Specialization of {@code FindSink} for int streams */
+ static final class OfInt extends FindSink<Integer, OptionalInt>
+ implements Sink.OfInt {
+ @Override
+ public void accept(int value) {
+ // Boxing is OK here, since few values will actually flow into the sink
+ accept((Integer) value);
+ }
+
+ @Override
+ public OptionalInt get() {
+ return hasValue ? OptionalInt.of(value) : null;
+ }
+ }
+
+ /** Specialization of {@code FindSink} for long streams */
+ static final class OfLong extends FindSink<Long, OptionalLong>
+ implements Sink.OfLong {
+ @Override
+ public void accept(long value) {
+ // Boxing is OK here, since few values will actually flow into the sink
+ accept((Long) value);
+ }
+
+ @Override
+ public OptionalLong get() {
+ return hasValue ? OptionalLong.of(value) : null;
+ }
+ }
+
+ /** Specialization of {@code FindSink} for double streams */
+ static final class OfDouble extends FindSink<Double, OptionalDouble>
+ implements Sink.OfDouble {
+ @Override
+ public void accept(double value) {
+ // Boxing is OK here, since few values will actually flow into the sink
+ accept((Double) value);
+ }
+
+ @Override
+ public OptionalDouble get() {
+ return hasValue ? OptionalDouble.of(value) : null;
+ }
+ }
+ }
+
+ /**
+ * {@code ForkJoinTask} implementing parallel short-circuiting search
+ * @param <P_IN> Input element type to the stream pipeline
+ * @param <P_OUT> Output element type from the stream pipeline
+ * @param <O> Result type from the find operation
+ */
+ private static final class FindTask<P_IN, P_OUT, O>
+ extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
+ private final FindOp<P_OUT, O> op;
+
+ FindTask(FindOp<P_OUT, O> op,
+ PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator) {
+ super(helper, spliterator);
+ this.op = op;
+ }
+
+ FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
+ super(parent, spliterator);
+ this.op = parent.op;
+ }
+
+ @Override
+ protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
+ return new FindTask<>(this, spliterator);
+ }
+
+ @Override
+ protected O getEmptyResult() {
+ return op.emptyValue;
+ }
+
+ private void foundResult(O answer) {
+ if (isLeftmostNode())
+ shortCircuit(answer);
+ else
+ cancelLaterNodes();
+ }
+
+ @Override
+ protected O doLeaf() {
+ O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
+ if (!op.mustFindFirst) {
+ if (result != null)
+ shortCircuit(result);
+ return null;
+ }
+ else {
+ if (result != null) {
+ foundResult(result);
+ return result;
+ }
+ else
+ return null;
+ }
+ }
+
+ @Override
+ public void onCompletion(CountedCompleter<?> caller) {
+ if (op.mustFindFirst) {
+ for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
+ p = child, child = rightChild) {
+ O result = child.getLocalResult();
+ if (result != null && op.presentPredicate.test(result)) {
+ setLocalResult(result);
+ foundResult(result);
+ break;
+ }
+ }
+ }
+ super.onCompletion(caller);
+ }
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,396 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
+
+/**
+ * Factory for creating instances of {@code TerminalOp} that perform an
+ * action for every element of a stream. Supported variants include unordered
+ * traversal (elements are provided to the {@code Consumer} as soon as they are
+ * available), and ordered traversal (elements are provided to the
+ * {@code Consumer} in encounter order.)
+ *
+ * <p>Elements are provided to the {@code Consumer} on whatever thread and
+ * whatever order they become available. For ordered traversals, it is
+ * guaranteed that processing an element <em>happens-before</em> processing
+ * subsequent elements in the encounter order.
+ *
+ * <p>Exceptions occurring as a result of sending an element to the
+ * {@code Consumer} will be relayed to the caller and traversal will be
+ * prematurely terminated.
+ *
+ * @since 1.8
+ */
+final class ForEachOps {
+
+ private ForEachOps() { }
+
+ /**
+ * Constructs a {@code TerminalOp} that perform an action for every element
+ * of a stream.
+ *
+ * @param action the {@code Consumer} that receives all elements of a
+ * stream
+ * @param ordered whether an ordered traversal is requested
+ * @param <T> the type of the stream elements
+ * @return the {@code TerminalOp} instance
+ */
+ public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
+ boolean ordered) {
+ Objects.requireNonNull(action);
+ return new ForEachOp.OfRef<>(action, ordered);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that perform an action for every element
+ * of an {@code IntStream}.
+ *
+ * @param action the {@code IntConsumer} that receives all elements of a
+ * stream
+ * @param ordered whether an ordered traversal is requested
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp<Integer, Void> makeInt(IntConsumer action,
+ boolean ordered) {
+ Objects.requireNonNull(action);
+ return new ForEachOp.OfInt(action, ordered);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that perform an action for every element
+ * of a {@code LongStream}.
+ *
+ * @param action the {@code LongConsumer} that receives all elements of a
+ * stream
+ * @param ordered whether an ordered traversal is requested
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp<Long, Void> makeLong(LongConsumer action,
+ boolean ordered) {
+ Objects.requireNonNull(action);
+ return new ForEachOp.OfLong(action, ordered);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that perform an action for every element
+ * of a {@code DoubleStream}.
+ *
+ * @param action the {@code DoubleConsumer} that receives all elements of
+ * a stream
+ * @param ordered whether an ordered traversal is requested
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action,
+ boolean ordered) {
+ Objects.requireNonNull(action);
+ return new ForEachOp.OfDouble(action, ordered);
+ }
+
+ /**
+ * A {@code TerminalOp} that evaluates a stream pipeline and sends the
+ * output to itself as a {@code TerminalSink}. Elements will be sent in
+ * whatever thread they become available. If the traversal is unordered,
+ * they will be sent independent of the stream's encounter order.
+ *
+ * <p>This terminal operation is stateless. For parallel evaluation, each
+ * leaf instance of a {@code ForEachTask} will send elements to the same
+ * {@code TerminalSink} reference that is an instance of this class.
+ *
+ * @param <T> the output type of the stream pipeline
+ */
+ private static abstract class ForEachOp<T>
+ implements TerminalOp<T, Void>, TerminalSink<T, Void> {
+ private final boolean ordered;
+
+ protected ForEachOp(boolean ordered) {
+ this.ordered = ordered;
+ }
+
+ // TerminalOp
+
+ @Override
+ public int getOpFlags() {
+ return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
+ }
+
+ @Override
+ public <S> Void evaluateSequential(PipelineHelper<T> helper,
+ Spliterator<S> spliterator) {
+ return helper.wrapAndCopyInto(this, spliterator).get();
+ }
+
+ @Override
+ public <S> Void evaluateParallel(PipelineHelper<T> helper,
+ Spliterator<S> spliterator) {
+ if (ordered)
+ new ForEachOrderedTask<>(helper, spliterator, this).invoke();
+ else
+ new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
+ return null;
+ }
+
+ // TerminalSink
+
+ @Override
+ public Void get() {
+ return null;
+ }
+
+ // Implementations
+
+ /** Implementation class for reference streams */
+ private static class OfRef<T> extends ForEachOp<T> {
+ final Consumer<? super T> consumer;
+
+ OfRef(Consumer<? super T> consumer, boolean ordered) {
+ super(ordered);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(T t) {
+ consumer.accept(t);
+ }
+ }
+
+ /** Implementation class for {@code IntStream} */
+ private static class OfInt extends ForEachOp<Integer>
+ implements Sink.OfInt {
+ final IntConsumer consumer;
+
+ OfInt(IntConsumer consumer, boolean ordered) {
+ super(ordered);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.INT_VALUE;
+ }
+
+ @Override
+ public void accept(int t) {
+ consumer.accept(t);
+ }
+ }
+
+ /** Implementation class for {@code LongStream} */
+ private static class OfLong extends ForEachOp<Long>
+ implements Sink.OfLong {
+ final LongConsumer consumer;
+
+ OfLong(LongConsumer consumer, boolean ordered) {
+ super(ordered);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.LONG_VALUE;
+ }
+
+ @Override
+ public void accept(long t) {
+ consumer.accept(t);
+ }
+ }
+
+ /** Implementation class for {@code DoubleStream} */
+ private static class OfDouble extends ForEachOp<Double>
+ implements Sink.OfDouble {
+ final DoubleConsumer consumer;
+
+ OfDouble(DoubleConsumer consumer, boolean ordered) {
+ super(ordered);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.DOUBLE_VALUE;
+ }
+
+ @Override
+ public void accept(double t) {
+ consumer.accept(t);
+ }
+ }
+ }
+
+ /** A {@code ForkJoinTask} for performing a parallel for-each operation */
+ private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+ private Spliterator<S> spliterator;
+ private final Sink<S> sink;
+ private final PipelineHelper<T> helper;
+ private final long targetSize;
+
+ ForEachTask(PipelineHelper<T> helper,
+ Spliterator<S> spliterator,
+ Sink<S> sink) {
+ super(null);
+ this.spliterator = spliterator;
+ this.sink = sink;
+ this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+ this.helper = helper;
+ }
+
+ ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
+ super(parent);
+ this.spliterator = spliterator;
+ this.sink = parent.sink;
+ this.targetSize = parent.targetSize;
+ this.helper = parent.helper;
+ }
+
+ public void compute() {
+ boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
+ while (true) {
+ if (isShortCircuit && sink.cancellationRequested()) {
+ propagateCompletion();
+ spliterator = null;
+ return;
+ }
+
+ Spliterator<S> split;
+ if (!AbstractTask.suggestSplit(spliterator, targetSize)
+ || (split = spliterator.trySplit()) == null) {
+ helper.copyInto(sink, spliterator);
+ propagateCompletion();
+ spliterator = null;
+ return;
+ }
+ else {
+ addToPendingCount(1);
+ new ForEachTask<>(this, split).fork();
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@code ForkJoinTask} for performing a parallel for-each operation
+ * which visits the elements in encounter order
+ */
+ private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
+ private final PipelineHelper<T> helper;
+ private Spliterator<S> spliterator;
+ private final long targetSize;
+ private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
+ private final Sink<T> action;
+ private final Object lock;
+ private final ForEachOrderedTask<S, T> leftPredecessor;
+ private Node<T> node;
+
+ protected ForEachOrderedTask(PipelineHelper<T> helper,
+ Spliterator<S> spliterator,
+ Sink<T> action) {
+ super(null);
+ this.helper = helper;
+ this.spliterator = spliterator;
+ this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+ this.completionMap = new ConcurrentHashMap<>();
+ this.action = action;
+ this.lock = new Object();
+ this.leftPredecessor = null;
+ }
+
+ ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
+ Spliterator<S> spliterator,
+ ForEachOrderedTask<S, T> leftPredecessor) {
+ super(parent);
+ this.helper = parent.helper;
+ this.spliterator = spliterator;
+ this.targetSize = parent.targetSize;
+ this.completionMap = parent.completionMap;
+ this.action = parent.action;
+ this.lock = parent.lock;
+ this.leftPredecessor = leftPredecessor;
+ }
+
+ @Override
+ public final void compute() {
+ doCompute(this);
+ }
+
+ private static<S, T> void doCompute(ForEachOrderedTask<S, T> task) {
+ while (true) {
+ Spliterator<S> split;
+ if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
+ || (split = task.spliterator.trySplit()) == null) {
+ if (task.getPendingCount() == 0) {
+ task.helper.wrapAndCopyInto(task.action, task.spliterator);
+ }
+ else {
+ Node.Builder<T> nb = task.helper.makeNodeBuilder(
+ task.helper.exactOutputSizeIfKnown(task.spliterator),
+ size -> (T[]) new Object[size]);
+ task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build();
+ }
+ task.tryComplete();
+ return;
+ }
+ else {
+ ForEachOrderedTask<S, T> leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
+ ForEachOrderedTask<S, T> rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
+ task.completionMap.put(leftChild, rightChild);
+ task.addToPendingCount(1); // forking
+ rightChild.addToPendingCount(1); // right pending on left child
+ if (task.leftPredecessor != null) {
+ leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
+ if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
+ task.addToPendingCount(-1); // transfer my "right child" count to my left child
+ else
+ leftChild.addToPendingCount(-1); // left child is ready to go when ready
+ }
+ leftChild.fork();
+ task = rightChild;
+ }
+ }
+ }
+
+ @Override
+ public void onCompletion(CountedCompleter<?> caller) {
+ spliterator = null;
+ if (node != null) {
+ // Dump any data from this leaf into the sink
+ synchronized (lock) {
+ node.forEach(action);
+ }
+ node = null;
+ }
+ ForEachOrderedTask<S, T> victim = completionMap.remove(this);
+ if (victim != null)
+ victim.tryComplete();
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/MatchOps.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.function.DoublePredicate;
+import java.util.function.IntPredicate;
+import java.util.function.LongPredicate;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+/**
+ * Factory for instances of a short-circuiting {@code TerminalOp} that implement
+ * quantified predicate matching on the elements of a stream. Supported variants
+ * include match-all, match-any, and match-none.
+ *
+ * @since 1.8
+ */
+final class MatchOps {
+
+ private MatchOps() { }
+
+ /**
+ * Enum describing quantified match options -- all match, any match, none
+ * match.
+ */
+ enum MatchKind {
+ /** Do all elements match the predicate? */
+ ANY(true, true),
+
+ /** Do any elements match the predicate? */
+ ALL(false, false),
+
+ /** Do no elements match the predicate? */
+ NONE(true, false);
+
+ private final boolean stopOnPredicateMatches;
+ private final boolean shortCircuitResult;
+
+ private MatchKind(boolean stopOnPredicateMatches,
+ boolean shortCircuitResult) {
+ this.stopOnPredicateMatches = stopOnPredicateMatches;
+ this.shortCircuitResult = shortCircuitResult;
+ }
+ }
+
+ /**
+ * Constructs a quantified predicate matcher for a Stream.
+ *
+ * @param <T> the type of stream elements
+ * @param predicate the {@code Predicate} to apply to stream elements
+ * @param matchKind the kind of quantified match (all, any, none)
+ * @return a {@code TerminalOp} implementing the desired quantified match
+ * criteria
+ */
+ public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
+ MatchKind matchKind) {
+ Objects.requireNonNull(predicate);
+ Objects.requireNonNull(matchKind);
+ class MatchSink extends BooleanTerminalSink<T> {
+ MatchSink() {
+ super(matchKind);
+ }
+
+ @Override
+ public void accept(T t) {
+ if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
+ stop = true;
+ value = matchKind.shortCircuitResult;
+ }
+ }
+ }
+
+ // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
+ Supplier<BooleanTerminalSink<T>> s = new Supplier<BooleanTerminalSink<T>>() {
+ @Override
+ public BooleanTerminalSink<T> get() {return new MatchSink();}
+ };
+ return new MatchOp<>(StreamShape.REFERENCE, matchKind, s);
+ }
+
+ /**
+ * Constructs a quantified predicate matcher for an {@code IntStream}.
+ *
+ * @param predicate the {@code Predicate} to apply to stream elements
+ * @param matchKind the kind of quantified match (all, any, none)
+ * @return a {@code TerminalOp} implementing the desired quantified match
+ * criteria
+ */
+ public static TerminalOp<Integer, Boolean> makeInt(IntPredicate predicate,
+ MatchKind matchKind) {
+ Objects.requireNonNull(predicate);
+ Objects.requireNonNull(matchKind);
+ class MatchSink extends BooleanTerminalSink<Integer> implements Sink.OfInt {
+ MatchSink() {
+ super(matchKind);
+ }
+
+ @Override
+ public void accept(int t) {
+ if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
+ stop = true;
+ value = matchKind.shortCircuitResult;
+ }
+ }
+ }
+
+ // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
+ Supplier<BooleanTerminalSink<Integer>> s = new Supplier<BooleanTerminalSink<Integer>>() {
+ @Override
+ public BooleanTerminalSink<Integer> get() {return new MatchSink();}
+ };
+ return new MatchOp<>(StreamShape.INT_VALUE, matchKind, s);
+ }
+
+ /**
+ * Constructs a quantified predicate matcher for a {@code LongStream}.
+ *
+ * @param predicate the {@code Predicate} to apply to stream elements
+ * @param matchKind the kind of quantified match (all, any, none)
+ * @return a {@code TerminalOp} implementing the desired quantified match
+ * criteria
+ */
+ public static TerminalOp<Long, Boolean> makeLong(LongPredicate predicate,
+ MatchKind matchKind) {
+ Objects.requireNonNull(predicate);
+ Objects.requireNonNull(matchKind);
+ class MatchSink extends BooleanTerminalSink<Long> implements Sink.OfLong {
+
+ MatchSink() {
+ super(matchKind);
+ }
+
+ @Override
+ public void accept(long t) {
+ if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
+ stop = true;
+ value = matchKind.shortCircuitResult;
+ }
+ }
+ }
+
+ // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
+ Supplier<BooleanTerminalSink<Long>> s = new Supplier<BooleanTerminalSink<Long>>() {
+ @Override
+ public BooleanTerminalSink<Long> get() {return new MatchSink();}
+ };
+ return new MatchOp<>(StreamShape.LONG_VALUE, matchKind, s);
+ }
+
+ /**
+ * Constructs a quantified predicate matcher for a {@code DoubleStream}.
+ *
+ * @param predicate the {@code Predicate} to apply to stream elements
+ * @param matchKind the kind of quantified match (all, any, none)
+ * @return a {@code TerminalOp} implementing the desired quantified match
+ * criteria
+ */
+ public static TerminalOp<Double, Boolean> makeDouble(DoublePredicate predicate,
+ MatchKind matchKind) {
+ Objects.requireNonNull(predicate);
+ Objects.requireNonNull(matchKind);
+ class MatchSink extends BooleanTerminalSink<Double> implements Sink.OfDouble {
+
+ MatchSink() {
+ super(matchKind);
+ }
+
+ @Override
+ public void accept(double t) {
+ if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
+ stop = true;
+ value = matchKind.shortCircuitResult;
+ }
+ }
+ }
+
+ // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
+ Supplier<BooleanTerminalSink<Double>> s = new Supplier<BooleanTerminalSink<Double>>() {
+ @Override
+ public BooleanTerminalSink<Double> get() {return new MatchSink();}
+ };
+ return new MatchOp<>(StreamShape.DOUBLE_VALUE, matchKind, s);
+ }
+
+ /**
+ * A short-circuiting {@code TerminalOp} that evaluates a predicate on the
+ * elements of a stream and determines whether all, any or none of those
+ * elements match the predicate.
+ *
+ * @param <T> the output type of the stream pipeline
+ */
+ private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
+ private final StreamShape inputShape;
+ final MatchKind matchKind;
+ final Supplier<BooleanTerminalSink<T>> sinkSupplier;
+
+ /**
+ * Constructs a {@code MatchOp}.
+ *
+ * @param shape the output shape of the stream pipeline
+ * @param matchKind the kind of quantified match (all, any, none)
+ * @param sinkSupplier {@code Supplier} for a {@code Sink} of the
+ * appropriate shape which implements the matching operation
+ */
+ MatchOp(StreamShape shape,
+ MatchKind matchKind,
+ Supplier<BooleanTerminalSink<T>> sinkSupplier) {
+ this.inputShape = shape;
+ this.matchKind = matchKind;
+ this.sinkSupplier = sinkSupplier;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return inputShape;
+ }
+
+ @Override
+ public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
+ Spliterator<S> spliterator) {
+ return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
+ }
+
+ @Override
+ public <S> Boolean evaluateParallel(PipelineHelper<T> helper,
+ Spliterator<S> spliterator) {
+ // Approach for parallel implementation:
+ // - Decompose as per usual
+ // - run match on leaf chunks, call result "b"
+ // - if b == matchKind.shortCircuitOn, complete early and return b
+ // - else if we complete normally, return !shortCircuitOn
+
+ return new MatchTask<>(this, helper, spliterator).invoke();
+ }
+ }
+
+ /**
+ * Boolean specific terminal sink to avoid the boxing costs when returning
+ * results. Subclasses implement the shape-specific functionality.
+ *
+ * @param <T> The output type of the stream pipeline
+ */
+ private static abstract class BooleanTerminalSink<T> implements Sink<T> {
+ boolean stop;
+ boolean value;
+
+ BooleanTerminalSink(MatchKind matchKind) {
+ value = !matchKind.shortCircuitResult;
+ }
+
+ public boolean getAndClearState() {
+ return value;
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return stop;
+ }
+ }
+
+ /**
+ * ForkJoinTask implementation to implement a parallel short-circuiting
+ * quantified match
+ *
+ * @param <P_IN> the type of source elements for the pipeline
+ * @param <P_OUT> the type of output elements for the pipeline
+ */
+ private static final class MatchTask<P_IN, P_OUT>
+ extends AbstractShortCircuitTask<P_IN, P_OUT, Boolean, MatchTask<P_IN, P_OUT>> {
+ private final MatchOp<P_OUT> op;
+
+ /**
+ * Constructor for root node
+ */
+ MatchTask(MatchOp<P_OUT> op, PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator) {
+ super(helper, spliterator);
+ this.op = op;
+ }
+
+ /**
+ * Constructor for non-root node
+ */
+ MatchTask(MatchTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
+ super(parent, spliterator);
+ this.op = parent.op;
+ }
+
+ @Override
+ protected MatchTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
+ return new MatchTask<>(this, spliterator);
+ }
+
+ @Override
+ protected Boolean doLeaf() {
+ boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();
+ if (b == op.matchKind.shortCircuitResult)
+ shortCircuit(b);
+ return null;
+ }
+
+ @Override
+ protected Boolean getEmptyResult() {
+ return !op.matchKind.shortCircuitResult;
+ }
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/Node.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,557 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+import java.util.function.LongConsumer;
+
+/**
+ * An immutable container for describing an ordered sequence of elements of some
+ * type {@code T}.
+ *
+ * <p>A {@code Node} contains a fixed number of elements, which can be accessed
+ * via the {@link #count}, {@link #spliterator}, {@link #forEach},
+ * {@link #asArray}, or {@link #copyInto} methods. A {@code Node} may have zero
+ * or more child {@code Node}s; if it has no children (accessed via
+ * {@link #getChildCount} and {@link #getChild(int)}, it is considered <em>flat
+ * </em> or a <em>leaf</em>; if it has children, it is considered an
+ * <em>internal</em> node. The size of an internal node is the sum of sizes of
+ * its children.
+ *
+ * @apiNote
+ * <p>A {@code Node} typically does not store the elements directly, but instead
+ * mediates access to one or more existing (effectively immutable) data
+ * structures such as a {@code Collection}, array, or a set of other
+ * {@code Node}s. Commonly {@code Node}s are formed into a tree whose shape
+ * corresponds to the computation tree that produced the elements that are
+ * contained in the leaf nodes. The use of {@code Node} within the stream
+ * framework is largely to avoid copying data unnecessarily during parallel
+ * operations.
+ *
+ * @param <T> the type of elements.
+ * @since 1.8
+ */
+interface Node<T> {
+
+ /**
+ * Returns a {@link Spliterator} describing the elements contained in this
+ * {@code Node}.
+ *
+ * @return a {@code Spliterator} describing the elements contained in this
+ * {@code Node}
+ */
+ Spliterator<T> spliterator();
+
+ /**
+ * Traverses the elements of this node, and invoke the provided
+ * {@code Consumer} with each element. Elements are provided in encounter
+ * order if the source for the {@code Node} has a defined encounter order.
+ *
+ * @param consumer a {@code Consumer} that is to be invoked with each
+ * element in this {@code Node}
+ */
+ void forEach(Consumer<? super T> consumer);
+
+ /**
+ * Returns the number of child nodes of this node.
+ *
+ * @implSpec The default implementation returns zero.
+ *
+ * @return the number of child nodes
+ */
+ default int getChildCount() {
+ return 0;
+ }
+
+ /**
+ * Retrieves the child {@code Node} at a given index.
+ *
+ * @implSpec The default implementation always throws
+ * {@code IndexOutOfBoundsException}.
+ *
+ * @param i the index to the child node
+ * @return the child node
+ * @throws IndexOutOfBoundsException if the index is less than 0 or greater
+ * than or equal to the number of child nodes
+ */
+ default Node<T> getChild(int i) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ /**
+ * Provides an array view of the contents of this node.
+ *
+ * <p>Depending on the underlying implementation, this may return a
+ * reference to an internal array rather than a copy. Since the returned
+ * array may be shared, the returned array should not be modified. The
+ * {@code generator} function may be consulted to create the array if a new
+ * array needs to be created.
+ *
+ * @param generator a factory function which takes an integer parameter and
+ * returns a new, empty array of that size and of the appropriate
+ * array type
+ * @return an array containing the contents of this {@code Node}
+ */
+ T[] asArray(IntFunction<T[]> generator);
+
+ /**
+ * Copies the content of this {@code Node} into an array, starting at a
+ * given offset into the array. It is the caller's responsibility to ensure
+ * there is sufficient room in the array.
+ *
+ * @param array the array into which to copy the contents of this
+ * {@code Node}
+ * @param offset the starting offset within the array
+ * @throws IndexOutOfBoundsException if copying would cause access of data
+ * outside array bounds
+ * @throws NullPointerException if {@code array} is {@code null}
+ */
+ void copyInto(T[] array, int offset);
+
+ /**
+ * Gets the {@code StreamShape} associated with this {@code Node}.
+ *
+ * @implSpec The default in {@code Node} returns
+ * {@code StreamShape.REFERENCE}
+ *
+ * @return the stream shape associated with this node
+ */
+ default StreamShape getShape() {
+ return StreamShape.REFERENCE;
+ }
+
+ /**
+ * Returns the number of elements contained in this node.
+ *
+ * @return the number of elements contained in this node
+ */
+ long count();
+
+ /**
+ * A mutable builder for a {@code Node} that implements {@link Sink}, which
+ * builds a flat node containing the elements that have been pushed to it.
+ */
+ interface Builder<T> extends Sink<T> {
+
+ /**
+ * Builds the node. Should be called after all elements have been
+ * pushed and signalled with an invocation of {@link Sink#end()}.
+ *
+ * @return the resulting {@code Node}
+ */
+ Node<T> build();
+
+ /**
+ * Specialized @{code Node.Builder} for int elements
+ */
+ interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
+ @Override
+ Node.OfInt build();
+ }
+
+ /**
+ * Specialized @{code Node.Builder} for long elements
+ */
+ interface OfLong extends Node.Builder<Long>, Sink.OfLong {
+ @Override
+ Node.OfLong build();
+ }
+
+ /**
+ * Specialized @{code Node.Builder} for double elements
+ */
+ interface OfDouble extends Node.Builder<Double>, Sink.OfDouble {
+ @Override
+ Node.OfDouble build();
+ }
+ }
+
+ /**
+ * Specialized {@code Node} for int elements
+ */
+ interface OfInt extends Node<Integer> {
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return a {@link Spliterator.OfInt} describing the elements of this
+ * node
+ */
+ @Override
+ Spliterator.OfInt spliterator();
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param consumer a {@code Consumer} that is to be invoked with each
+ * element in this {@code Node}. If this is an
+ * {@code IntConsumer}, it is cast to {@code IntConsumer} so the
+ * elements may be processed without boxing.
+ */
+ @Override
+ default void forEach(Consumer<? super Integer> consumer) {
+ if (consumer instanceof IntConsumer) {
+ forEach((IntConsumer) consumer);
+ }
+ else {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfInt.forEachRemaining(Consumer)");
+ spliterator().forEachRemaining(consumer);
+ }
+ }
+
+ /**
+ * Traverses the elements of this node, and invoke the provided
+ * {@code IntConsumer} with each element.
+ *
+ * @param consumer a {@code IntConsumer} that is to be invoked with each
+ * element in this {@code Node}
+ */
+ void forEach(IntConsumer consumer);
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes the generator to create
+ * an instance of an Integer[] array with a length of {@link #count()}
+ * and then invokes {@link #copyInto(Integer[], int)} with that
+ * Integer[] array at an offset of 0. This is not efficient and it is
+ * recommended to invoke {@link #asIntArray()}.
+ */
+ @Override
+ default Integer[] asArray(IntFunction<Integer[]> generator) {
+ Integer[] boxed = generator.apply((int) count());
+ copyInto(boxed, 0);
+ return boxed;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes {@link #asIntArray()} to
+ * obtain an int[] array then and copies the elements from that int[]
+ * array into the boxed Integer[] array. This is not efficient and it
+ * is recommended to invoke {@link #copyInto(int[], int)}.
+ */
+ @Override
+ default void copyInto(Integer[] boxed, int offset) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfInt.copyInto(Integer[], int)");
+
+ int[] array = asIntArray();
+ for (int i = 0; i < array.length; i++) {
+ boxed[offset + i] = array[i];
+ }
+ }
+
+ @Override
+ default Node.OfInt getChild(int i) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ /**
+ * Views this node as an int[] array.
+ *
+ * <p>Depending on the underlying implementation this may return a
+ * reference to an internal array rather than a copy. It is the callers
+ * responsibility to decide if either this node or the array is utilized
+ * as the primary reference for the data.</p>
+ *
+ * @return an array containing the contents of this {@code Node}
+ */
+ int[] asIntArray();
+
+ /**
+ * Copies the content of this {@code Node} into an int[] array, starting
+ * at a given offset into the array. It is the caller's responsibility
+ * to ensure there is sufficient room in the array.
+ *
+ * @param array the array into which to copy the contents of this
+ * {@code Node}
+ * @param offset the starting offset within the array
+ * @throws IndexOutOfBoundsException if copying would cause access of
+ * data outside array bounds
+ * @throws NullPointerException if {@code array} is {@code null}
+ */
+ void copyInto(int[] array, int offset);
+
+ /**
+ * {@inheritDoc}
+ * @implSpec The default in {@code Node.OfInt} returns
+ * {@code StreamShape.INT_VALUE}
+ */
+ default StreamShape getShape() {
+ return StreamShape.INT_VALUE;
+ }
+
+ }
+
+ /**
+ * Specialized {@code Node} for long elements
+ */
+ interface OfLong extends Node<Long> {
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return a {@link Spliterator.OfLong} describing the elements of this
+ * node
+ */
+ @Override
+ Spliterator.OfLong spliterator();
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param consumer A {@code Consumer} that is to be invoked with each
+ * element in this {@code Node}. If this is an
+ * {@code LongConsumer}, it is cast to {@code LongConsumer} so
+ * the elements may be processed without boxing.
+ */
+ @Override
+ default void forEach(Consumer<? super Long> consumer) {
+ if (consumer instanceof LongConsumer) {
+ forEach((LongConsumer) consumer);
+ }
+ else {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfLong.forEachRemaining(Consumer)");
+ spliterator().forEachRemaining(consumer);
+ }
+ }
+
+ /**
+ * Traverses the elements of this node, and invoke the provided
+ * {@code LongConsumer} with each element.
+ *
+ * @param consumer a {@code LongConsumer} that is to be invoked with
+ * each element in this {@code Node}
+ */
+ void forEach(LongConsumer consumer);
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes the generator to create
+ * an instance of a Long[] array with a length of {@link #count()} and
+ * then invokes {@link #copyInto(Long[], int)} with that Long[] array at
+ * an offset of 0. This is not efficient and it is recommended to
+ * invoke {@link #asLongArray()}.
+ */
+ @Override
+ default Long[] asArray(IntFunction<Long[]> generator) {
+ Long[] boxed = generator.apply((int) count());
+ copyInto(boxed, 0);
+ return boxed;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes {@link #asLongArray()}
+ * to obtain a long[] array then and copies the elements from that
+ * long[] array into the boxed Long[] array. This is not efficient and
+ * it is recommended to invoke {@link #copyInto(long[], int)}.
+ */
+ @Override
+ default void copyInto(Long[] boxed, int offset) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfInt.copyInto(Long[], int)");
+
+ long[] array = asLongArray();
+ for (int i = 0; i < array.length; i++) {
+ boxed[offset + i] = array[i];
+ }
+ }
+
+ @Override
+ default Node.OfLong getChild(int i) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ /**
+ * Views this node as a long[] array.
+ *
+ * <p/>Depending on the underlying implementation this may return a
+ * reference to an internal array rather than a copy. It is the callers
+ * responsibility to decide if either this node or the array is utilized
+ * as the primary reference for the data.
+ *
+ * @return an array containing the contents of this {@code Node}
+ */
+ long[] asLongArray();
+
+ /**
+ * Copies the content of this {@code Node} into a long[] array, starting
+ * at a given offset into the array. It is the caller's responsibility
+ * to ensure there is sufficient room in the array.
+ *
+ * @param array the array into which to copy the contents of this
+ * {@code Node}
+ * @param offset the starting offset within the array
+ * @throws IndexOutOfBoundsException if copying would cause access of
+ * data outside array bounds
+ * @throws NullPointerException if {@code array} is {@code null}
+ */
+ void copyInto(long[] array, int offset);
+
+ /**
+ * {@inheritDoc}
+ * @implSpec The default in {@code Node.OfLong} returns
+ * {@code StreamShape.LONG_VALUE}
+ */
+ default StreamShape getShape() {
+ return StreamShape.LONG_VALUE;
+ }
+
+
+ }
+
+ /**
+ * Specialized {@code Node} for double elements
+ */
+ interface OfDouble extends Node<Double> {
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return A {@link Spliterator.OfDouble} describing the elements of
+ * this node
+ */
+ @Override
+ Spliterator.OfDouble spliterator();
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param consumer A {@code Consumer} that is to be invoked with each
+ * element in this {@code Node}. If this is an
+ * {@code DoubleConsumer}, it is cast to {@code DoubleConsumer}
+ * so the elements may be processed without boxing.
+ */
+ @Override
+ default void forEach(Consumer<? super Double> consumer) {
+ if (consumer instanceof DoubleConsumer) {
+ forEach((DoubleConsumer) consumer);
+ }
+ else {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfLong.forEachRemaining(Consumer)");
+ spliterator().forEachRemaining(consumer);
+ }
+ }
+
+ /**
+ * Traverses the elements of this node, and invoke the provided
+ * {@code DoubleConsumer} with each element.
+ *
+ * @param consumer A {@code DoubleConsumer} that is to be invoked with
+ * each element in this {@code Node}
+ */
+ void forEach(DoubleConsumer consumer);
+
+ //
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes the generator to create
+ * an instance of a Double[] array with a length of {@link #count()} and
+ * then invokes {@link #copyInto(Double[], int)} with that Double[]
+ * array at an offset of 0. This is not efficient and it is recommended
+ * to invoke {@link #asDoubleArray()}.
+ */
+ @Override
+ default Double[] asArray(IntFunction<Double[]> generator) {
+ Double[] boxed = generator.apply((int) count());
+ copyInto(boxed, 0);
+ return boxed;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec the default implementation invokes {@link #asDoubleArray()}
+ * to obtain a double[] array then and copies the elements from that
+ * double[] array into the boxed Double[] array. This is not efficient
+ * and it is recommended to invoke {@link #copyInto(double[], int)}.
+ */
+ @Override
+ default void copyInto(Double[] boxed, int offset) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Node.OfDouble.copyInto(Double[], int)");
+
+ double[] array = asDoubleArray();
+ for (int i = 0; i < array.length; i++) {
+ boxed[offset + i] = array[i];
+ }
+ }
+
+ @Override
+ default Node.OfDouble getChild(int i) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ /**
+ * Views this node as a double[] array.
+ *
+ * <p/>Depending on the underlying implementation this may return a
+ * reference to an internal array rather than a copy. It is the callers
+ * responsibility to decide if either this node or the array is utilized
+ * as the primary reference for the data.
+ *
+ * @return an array containing the contents of this {@code Node}
+ */
+ double[] asDoubleArray();
+
+ /**
+ * Copies the content of this {@code Node} into a double[] array, starting
+ * at a given offset into the array. It is the caller's responsibility
+ * to ensure there is sufficient room in the array.
+ *
+ * @param array the array into which to copy the contents of this
+ * {@code Node}
+ * @param offset the starting offset within the array
+ * @throws IndexOutOfBoundsException if copying would cause access of
+ * data outside array bounds
+ * @throws NullPointerException if {@code array} is {@code null}
+ */
+ void copyInto(double[] array, int offset);
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implSpec The default in {@code Node.OfDouble} returns
+ * {@code StreamShape.DOUBLE_VALUE}
+ */
+ default StreamShape getShape() {
+ return StreamShape.DOUBLE_VALUE;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/PipelineHelper.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.function.IntFunction;
+
+/**
+ * Helper class for executing <a href="package-summary.html#StreamPipelines">
+ * stream pipelines</a>, capturing all of the information about a stream
+ * pipeline (output shape, intermediate operations, stream flags, parallelism,
+ * etc) in one place.
+ *
+ * <p>
+ * A {@code PipelineHelper} describes the initial segment of a stream pipeline,
+ * including its source, intermediate operations, and may additionally
+ * incorporate information about the terminal (or stateful) operation which
+ * follows the last intermediate operation described by this
+ * {@code PipelineHelper}. The {@code PipelineHelper} is passed to the
+ * {@link TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator)},
+ * {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)},
+ * and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator,
+ * java.util.function.IntFunction)}, methods, which can use the
+ * {@code PipelineHelper} to access information about the pipeline such as
+ * input shape, output shape, stream flags, and size, and use the helper methods
+ * such as {@link #wrapAndCopyInto(Sink, Spliterator)},
+ * {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute
+ * pipeline operations.
+ *
+ * @param <P_OUT> type of output elements from the pipeline
+ * @since 1.8
+ */
+abstract class PipelineHelper<P_OUT> {
+
+ /**
+ * Gets the combined stream and operation flags for the output of the described
+ * pipeline. This will incorporate stream flags from the stream source, all
+ * the intermediate operations and the terminal operation.
+ *
+ * @return the combined stream and operation flags
+ * @see StreamOpFlag
+ */
+ abstract int getStreamAndOpFlags();
+
+ /**
+ * Returns the exact output size of the portion of the output resulting from
+ * applying the pipeline stages described by this {@code PipelineHelper} to
+ * the the portion of the input described by the provided
+ * {@code Spliterator}, if known. If not known or known infinite, will
+ * return {@code -1}.
+ *
+ * @apiNote
+ * The exact output size is known if the {@code Spliterator} has the
+ * {@code SIZED} characteristic, and the operation flags
+ * {@link StreamOpFlag#SIZED} is known on the combined stream and operation
+ * flags.
+ *
+ * @param spliterator the spliterator describing the relevant portion of the
+ * source data
+ * @return the exact size if known, or -1 if infinite or unknown
+ */
+ abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
+
+ /**
+ * Applies the pipeline stages described by this {@code PipelineHelper} to
+ * the provided {@code Spliterator} and send the results to the provided
+ * {@code Sink}.
+ *
+ * @implSpec
+ * The implementation behaves as if:
+ * <pre>{@code
+ * intoWrapped(wrapSink(sink), spliterator);
+ * }</pre>
+ *
+ * @param sink the {@code Sink} to receive the results
+ * @param spliterator the spliterator describing the source input to process
+ */
+ abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
+
+ /**
+ * Pushes elements obtained from the {@code Spliterator} into the provided
+ * {@code Sink}. If the stream pipeline is known to have short-circuiting
+ * stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), the
+ * {@link Sink#cancellationRequested()} is checked after each
+ * element, stopping if cancellation is requested.
+ *
+ * @implSpec
+ * This method conforms to the {@code Sink} protocol of calling
+ * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
+ * calling {@code Sink.end} after all elements have been pushed.
+ *
+ * @param wrappedSink the destination {@code Sink}
+ * @param spliterator the source {@code Spliterator}
+ */
+ abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
+
+ /**
+ * Pushes elements obtained from the {@code Spliterator} into the provided
+ * {@code Sink}, checking {@link Sink#cancellationRequested()} after each
+ * element, and stopping if cancellation is requested.
+ *
+ * @implSpec
+ * This method conforms to the {@code Sink} protocol of calling
+ * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
+ * calling {@code Sink.end} after all elements have been pushed or if
+ * cancellation is requested.
+ *
+ * @param wrappedSink the destination {@code Sink}
+ * @param spliterator the source {@code Spliterator}
+ */
+ abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
+
+ /**
+ * Takes a {@code Sink} that accepts elements of the output type of the
+ * {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts
+ * elements of the input type and implements all the intermediate operations
+ * described by this {@code PipelineHelper}, delivering the result into the
+ * provided {@code Sink}.
+ *
+ * @param sink the {@code Sink} to receive the results
+ * @return a {@code Sink} that implements the pipeline stages and sends
+ * results to the provided {@code Sink}
+ */
+ abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
+
+ /**
+ * Constructs a @{link Node.Builder} compatible with the output shape of
+ * this {@code PipelineHelper}.
+ *
+ * @param exactSizeIfKnown if >=0 then a builder will be created that has a
+ * fixed capacity of exactly sizeIfKnown elements; if < 0 then the
+ * builder has variable capacity. A fixed capacity builder will fail
+ * if an element is added after the builder has reached capacity.
+ * @param generator a factory function for array instances
+ * @return a {@code Node.Builder} compatible with the output shape of this
+ * {@code PipelineHelper}
+ */
+ abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
+ IntFunction<P_OUT[]> generator);
+
+ /**
+ * Collects all output elements resulting from applying the pipeline stages
+ * to the source {@code Spliterator} into a {@code Node}.
+ *
+ * @implNote
+ * If the pipeline has no intermediate operations and the source is backed
+ * by a {@code Node} then that {@code Node} will be returned (or flattened
+ * and then returned). This reduces copying for a pipeline consisting of a
+ * stateful operation followed by a terminal operation that returns an
+ * array, such as:
+ * <pre>{@code
+ * stream.sorted().toArray();
+ * }</pre>
+ *
+ * @param spliterator the source {@code Spliterator}
+ * @param flatten if true and the pipeline is a parallel pipeline then the
+ * {@code Node} returned will contain no children, otherwise the
+ * {@code Node} may represent the root in a tree that reflects the
+ * shape of the computation tree.
+ * @param generator a factory function for array instances
+ * @return the {@code Node} containing all output elements
+ */
+ abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
+ boolean flatten,
+ IntFunction<P_OUT[]> generator);
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/Sink.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,362 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
+
+/**
+ * An extension of {@link Consumer} used to conduct values through the stages of
+ * a stream pipeline, with additional methods to manage size information,
+ * control flow, etc. Before calling the {@code accept()} method on a
+ * {@code Sink} for the first time, you must first call the {@code begin()}
+ * method to inform it that data is coming (optionally informing the sink how
+ * much data is coming), and after all data has been sent, you must call the
+ * {@code end()} method. After calling {@code end()}, you should not call
+ * {@code accept()} without again calling {@code begin()}. {@code Sink} also
+ * offers a mechanism by which the sink can cooperatively signal that it does
+ * not wish to receive any more data (the {@code cancellationRequested()}
+ * method), which a source can poll before sending more data to the
+ * {@code Sink}.
+ *
+ * <p>A sink may be in one of two states: an initial state and an active state.
+ * It starts out in the initial state; the {@code begin()} method transitions
+ * it to the active state, and the {@code end()} method transitions it back into
+ * the initial state, where it can be re-used. Data-accepting methods (such as
+ * {@code accept()} are only valid in the active state.
+ *
+ * @apiNote
+ * A stream pipeline consists of a source, zero or more intermediate stages
+ * (such as filtering or mapping), and a terminal stage, such as reduction or
+ * for-each. For concreteness, consider the pipeline:
+ *
+ * <pre>{@code
+ * int longestStringLengthStartingWithA
+ * = strings.stream()
+ * .filter(s -> s.startsWith("A"))
+ * .mapToInt(String::length)
+ * .max();
+ * }</pre>
+ *
+ * <p>Here, we have three stages, filtering, mapping, and reducing. The
+ * filtering stage consumes strings and emits a subset of those strings; the
+ * mapping stage consumes strings and emits ints; the reduction stage consumes
+ * those ints and computes the maximal value.
+ *
+ * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
+ * whether the stage accepts objects, ints, longs, or doubles. Sink has entry
+ * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
+ * not need a specialized interface for each primitive specialization. (It
+ * might be called a "kitchen sink" for this omnivorous tendency.) The entry
+ * point to the pipeline is the {@code Sink} for the filtering stage, which
+ * sends some elements "downstream" -- into the {@code Sink} for the mapping
+ * stage, which in turn sends integral values downstream into the {@code Sink}
+ * for the reduction stage. The {@code Sink} implementations associated with a
+ * given stage is expected to know the data type for the next stage, and call
+ * the correct {@code accept} method on its downstream {@code Sink}. Similarly,
+ * each stage must implement the correct {@code accept} method corresponding to
+ * the data type it accepts.
+ *
+ * <p>The specialized subtypes such as {@link Sink.OfInt} override
+ * {@code accept(Object)} to call the appropriate primitive specialization of
+ * {@code accept}, implement the appropriate primitive specialization of
+ * {@code Consumer}, and re-abstract the appropriate primitive specialization of
+ * {@code accept}.
+ *
+ * <p>The chaining subtypes such as {@link ChainedInt} not only implement
+ * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
+ * represents the downstream {@code Sink}, and implement the methods
+ * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
+ * delegate to the downstream {@code Sink}. Most implementations of
+ * intermediate operations will use these chaining wrappers. For example, the
+ * mapping stage in the above example would look like:
+ *
+ * <pre>{@code
+ * IntSink is = new Sink.ChainedReference<U>(sink) {
+ * public void accept(U u) {
+ * downstream.accept(mapper.applyAsInt(u));
+ * }
+ * };
+ * }</pre>
+ *
+ * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
+ * to receive elements of type {@code U} as input, and pass the downstream sink
+ * to the constructor. Because the next stage expects to receive integers, we
+ * must call the {@code accept(int)} method when emitting values to the downstream.
+ * The {@code accept()} method applies the mapping function from {@code U} to
+ * {@code int} and passes the resulting value to the downstream {@code Sink}.
+ *
+ * @param <T> type of elements for value streams
+ * @since 1.8
+ */
+interface Sink<T> extends Consumer<T> {
+ /**
+ * Resets the sink state to receive a fresh data set. This must be called
+ * before sending any data to the sink. After calling {@link #end()},
+ * you may call this method to reset the sink for another calculation.
+ * @param size The exact size of the data to be pushed downstream, if
+ * known or {@code -1} if unknown or infinite.
+ *
+ * <p>Prior to this call, the sink must be in the initial state, and after
+ * this call it is in the active state.
+ */
+ default void begin(long size) {}
+
+ /**
+ * Indicates that all elements have been pushed. If the {@code Sink} is
+ * stateful, it should send any stored state downstream at this time, and
+ * should clear any accumulated state (and associated resources).
+ *
+ * <p>Prior to this call, the sink must be in the active state, and after
+ * this call it is returned to the initial state.
+ */
+ default void end() {}
+
+ /**
+ * Indicates that this {@code Sink} does not wish to receive any more data.
+ *
+ * @implSpec The default implementation always returns false.
+ *
+ * @return true if cancellation is requested
+ */
+ default boolean cancellationRequested() {
+ return false;
+ }
+
+ /**
+ * Accepts an int value.
+ *
+ * @implSpec The default implementation throws IllegalStateException.
+ *
+ * @throws IllegalStateException if this sink does not accept int values
+ */
+ default void accept(int value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * Accepts a long value.
+ *
+ * @implSpec The default implementation throws IllegalStateException.
+ *
+ * @throws IllegalStateException if this sink does not accept long values
+ */
+ default void accept(long value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * Accepts a double value.
+ *
+ * @implSpec The default implementation throws IllegalStateException.
+ *
+ * @throws IllegalStateException if this sink does not accept double values
+ */
+ default void accept(double value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
+ * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
+ * {@code accept(int)}.
+ */
+ interface OfInt extends Sink<Integer>, IntConsumer {
+ @Override
+ void accept(int value);
+
+ @Override
+ default void accept(Integer i) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
+ accept(i.intValue());
+ }
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
+ * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
+ * {@code accept(long)}.
+ */
+ interface OfLong extends Sink<Long>, LongConsumer {
+ @Override
+ void accept(long value);
+
+ @Override
+ default void accept(Long i) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
+ accept(i.longValue());
+ }
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
+ * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
+ * {@code accept(double)}.
+ */
+ interface OfDouble extends Sink<Double>, DoubleConsumer {
+ @Override
+ void accept(double value);
+
+ @Override
+ default void accept(Double i) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
+ accept(i.doubleValue());
+ }
+ }
+
+ /**
+ * Abstract {@code Sink} implementation for creating chains of
+ * sinks. The {@code begin}, {@code end}, and
+ * {@code cancellationRequested} methods are wired to chain to the
+ * downstream {@code Sink}. This implementation takes a downstream
+ * {@code Sink} of unknown input shape and produces a {@code Sink<T>}. The
+ * implementation of the {@code accept()} method must call the correct
+ * {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedReference<T> implements Sink<T> {
+ protected final Sink downstream;
+
+ public ChainedReference(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * Abstract {@code Sink} implementation designed for creating chains of
+ * sinks. The {@code begin}, {@code end}, and
+ * {@code cancellationRequested} methods are wired to chain to the
+ * downstream {@code Sink}. This implementation takes a downstream
+ * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
+ * The implementation of the {@code accept()} method must call the correct
+ * {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedInt implements Sink.OfInt {
+ protected final Sink downstream;
+
+ public ChainedInt(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * Abstract {@code Sink} implementation designed for creating chains of
+ * sinks. The {@code begin}, {@code end}, and
+ * {@code cancellationRequested} methods are wired to chain to the
+ * downstream {@code Sink}. This implementation takes a downstream
+ * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
+ * The implementation of the {@code accept()} method must call the correct
+ * {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedLong implements Sink.OfLong {
+ protected final Sink downstream;
+
+ public ChainedLong(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * Abstract {@code Sink} implementation designed for creating chains of
+ * sinks. The {@code begin}, {@code end}, and
+ * {@code cancellationRequested} methods are wired to chain to the
+ * downstream {@code Sink}. This implementation takes a downstream
+ * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
+ * The implementation of the {@code accept()} method must call the correct
+ * {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedDouble implements Sink.OfDouble {
+ protected final Sink downstream;
+
+ public ChainedDouble(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/StreamOpFlag.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,753 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Spliterator;
+
+/**
+ * Flags corresponding to characteristics of streams and operations. Flags are
+ * utilized by the stream framework to control, specialize or optimize
+ * computation.
+ *
+ * <p>
+ * Stream flags may be used to describe characteristics of several different
+ * entities associated with streams: stream sources, intermediate operations,
+ * and terminal operations. Not all stream flags are meaningful for all
+ * entities; the following table summarizes which flags are meaningful in what
+ * contexts:
+ *
+ * <div>
+ * <table>
+ * <caption>Type Characteristics</caption>
+ * <thead class="tableSubHeadingColor">
+ * <tr>
+ * <th colspan="2"> </th>
+ * <th>{@code DISTINCT}</th>
+ * <th>{@code SORTED}</th>
+ * <th>{@code ORDERED}</th>
+ * <th>{@code SIZED}</th>
+ * <th>{@code SHORT_CIRCUIT}</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <th colspan="2" class="tableSubHeadingColor">Stream source</th>
+ * <td>Y</td>
+ * <td>Y</td>
+ * <td>Y</td>
+ * <td>Y</td>
+ * <td>N</td>
+ * </tr>
+ * <tr>
+ * <th colspan="2" class="tableSubHeadingColor">Intermediate operation</th>
+ * <td>PCI</td>
+ * <td>PCI</td>
+ * <td>PCI</td>
+ * <td>PC</td>
+ * <td>PI</td>
+ * </tr>
+ * <tr>
+ * <th colspan="2" class="tableSubHeadingColor">Terminal operation</th>
+ * <td>N</td>
+ * <td>N</td>
+ * <td>PC</td>
+ * <td>N</td>
+ * <td>PI</td>
+ * </tr>
+ * </tbody>
+ * <tfoot>
+ * <tr>
+ * <th class="tableSubHeadingColor" colspan="2">Legend</th>
+ * <th colspan="6" rowspan="7"> </th>
+ * </tr>
+ * <tr>
+ * <th class="tableSubHeadingColor">Flag</th>
+ * <th class="tableSubHeadingColor">Meaning</th>
+ * <th colspan="6"></th>
+ * </tr>
+ * <tr><td>Y</td><td>Allowed</td></tr>
+ * <tr><td>N</td><td>Invalid</td></tr>
+ * <tr><td>P</td><td>Preserves</td></tr>
+ * <tr><td>C</td><td>Clears</td></tr>
+ * <tr><td>I</td><td>Injects</td></tr>
+ * </tfoot>
+ * </table>
+ * </div>
+ *
+ * <p>In the above table, "PCI" means "may preserve, clear, or inject"; "PC"
+ * means "may preserve or clear", "PI" means "may preserve or inject", and "N"
+ * means "not valid".
+ *
+ * <p>Stream flags are represented by unioned bit sets, so that a single word
+ * may describe all the characteristics of a given stream entity, and that, for
+ * example, the flags for a stream source can be efficiently combined with the
+ * flags for later operations on that stream.
+ *
+ * <p>The bit masks {@link #STREAM_MASK}, {@link #OP_MASK}, and
+ * {@link #TERMINAL_OP_MASK} can be ANDed with a bit set of stream flags to
+ * produce a mask containing only the valid flags for that entity type.
+ *
+ * <p>When describing a stream source, one only need describe what
+ * characteristics that stream has; when describing a stream operation, one need
+ * describe whether the operation preserves, injects, or clears that
+ * characteristic. Accordingly, two bits are used for each flag, so as to allow
+ * representing not only the presence of of a characteristic, but how an
+ * operation modifies that characteristic. There are two common forms in which
+ * flag bits are combined into an {@code int} bit set. <em>Stream flags</em>
+ * are a unioned bit set constructed by ORing the enum characteristic values of
+ * {@link #set()} (or, more commonly, ORing the corresponding static named
+ * constants prefixed with {@code IS_}). <em>Operation flags</em> are a unioned
+ * bit set constructed by ORing the enum characteristic values of {@link #set()}
+ * or {@link #clear()} (to inject, or clear, respectively, the corresponding
+ * flag), or more commonly ORing the corresponding named constants prefixed with
+ * {@code IS_} or {@code NOT_}. Flags that are not marked with {@code IS_} or
+ * {@code NOT_} are implicitly treated as preserved. Care must be taken when
+ * combining bitsets that the correct combining operations are applied in the
+ * correct order.
+ *
+ * <p>
+ * With the exception of {@link #SHORT_CIRCUIT}, stream characteristics can be
+ * derived from the equivalent {@link java.util.Spliterator} characteristics:
+ * {@link java.util.Spliterator#DISTINCT}, {@link java.util.Spliterator#SORTED},
+ * {@link java.util.Spliterator#ORDERED}, and
+ * {@link java.util.Spliterator#SIZED}. A spliterator characteristics bit set
+ * can be converted to stream flags using the method
+ * {@link #fromCharacteristics(java.util.Spliterator)} and converted back using
+ * {@link #toCharacteristics(int)}. (The bit set
+ * {@link #SPLITERATOR_CHARACTERISTICS_MASK} is used to AND with a bit set to
+ * produce a valid spliterator characteristics bit set that can be converted to
+ * stream flags.)
+ *
+ * <p>
+ * The source of a stream encapsulates a spliterator. The characteristics of
+ * that source spliterator when transformed to stream flags will be a proper
+ * subset of stream flags of that stream.
+ * For example:
+ * <pre> {@code
+ * Spliterator s = ...;
+ * Stream stream = Streams.stream(s);
+ * flagsFromSplitr = fromCharacteristics(s.characteristics());
+ * assert(flagsFromSplitr & stream.getStreamFlags() == flagsFromSplitr);
+ * }</pre>
+ *
+ * <p>
+ * An intermediate operation, performed on an input stream to create a new
+ * output stream, may preserve, clear or inject stream or operation
+ * characteristics. Similarly, a terminal operation, performed on an input
+ * stream to produce an output result may preserve, clear or inject stream or
+ * operation characteristics. Preservation means that if that characteristic
+ * is present on the input, then it is also present on the output. Clearing
+ * means that the characteristic is not present on the output regardless of the
+ * input. Injection means that the characteristic is present on the output
+ * regardless of the input. If a characteristic is not cleared or injected then
+ * it is implicitly preserved.
+ *
+ * <p>
+ * A pipeline consists of a stream source encapsulating a spliterator, one or
+ * more intermediate operations, and finally a terminal operation that produces
+ * a result. At each stage of the pipeline, a combined stream and operation
+ * flags can be calculated, using {@link #combineOpFlags(int, int)}. Such flags
+ * ensure that preservation, clearing and injecting information is retained at
+ * each stage.
+ *
+ * The combined stream and operation flags for the source stage of the pipeline
+ * is calculated as follows:
+ * <pre> {@code
+ * int flagsForSourceStage = combineOpFlags(sourceFlags, INITIAL_OPS_VALUE);
+ * }</pre>
+ *
+ * The combined stream and operation flags of each subsequent intermediate
+ * operation stage in the pipeline is calculated as follows:
+ * <pre> {@code
+ * int flagsForThisStage = combineOpFlags(flagsForPreviousStage, thisOpFlags);
+ * }</pre>
+ *
+ * Finally the flags output from the last intermediate operation of the pipeline
+ * are combined with the operation flags of the terminal operation to produce
+ * the flags output from the pipeline.
+ *
+ * <p>Those flags can then be used to apply optimizations. For example, if
+ * {@code SIZED.isKnown(flags)} returns true then the stream size remains
+ * constant throughout the pipeline, this information can be utilized to
+ * pre-allocate data structures and combined with
+ * {@link java.util.Spliterator#SUBSIZED} that information can be utilized to
+ * perform concurrent in-place updates into a shared array.
+ *
+ * For specific details see the {@link AbstractPipeline} constructors.
+ *
+ * @since 1.8
+ */
+enum StreamOpFlag {
+
+ /*
+ * Each characteristic takes up 2 bits in a bit set to accommodate
+ * preserving, clearing and setting/injecting information.
+ *
+ * This applies to stream flags, intermediate/terminal operation flags, and
+ * combined stream and operation flags. Even though the former only requires
+ * 1 bit of information per characteristic, is it more efficient when
+ * combining flags to align set and inject bits.
+ *
+ * Characteristics belong to certain types, see the Type enum. Bit masks for
+ * the types are constructed as per the following table:
+ *
+ * DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT
+ * SPLITERATOR 01 01 01 01 00
+ * STREAM 01 01 01 01 00
+ * OP 11 11 11 10 01
+ * TERMINAL_OP 00 00 10 00 01
+ * UPSTREAM_TERMINAL_OP 00 00 10 00 00
+ *
+ * 01 = set/inject
+ * 10 = clear
+ * 11 = preserve
+ *
+ * Construction of the columns is performed using a simple builder for
+ * non-zero values.
+ */
+
+
+ // The following flags correspond to characteristics on Spliterator
+ // and the values MUST be equal.
+ //
+
+ /**
+ * Characteristic value signifying that, for each pair of
+ * encountered elements in a stream {@code x, y}, {@code !x.equals(y)}.
+ * <p>
+ * A stream may have this value or an intermediate operation can preserve,
+ * clear or inject this value.
+ */
+ // 0, 0x00000001
+ // Matches Spliterator.DISTINCT
+ DISTINCT(0,
+ set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
+
+ /**
+ * Characteristic value signifying that encounter order follows a natural
+ * sort order of comparable elements.
+ * <p>
+ * A stream can have this value or an intermediate operation can preserve,
+ * clear or inject this value.
+ * <p>
+ * Note: The {@link java.util.Spliterator#SORTED} characteristic can define
+ * a sort order with an associated non-null comparator. Augmenting flag
+ * state with addition properties such that those properties can be passed
+ * to operations requires some disruptive changes for a singular use-case.
+ * Furthermore, comparing comparators for equality beyond that of identity
+ * is likely to be unreliable. Therefore the {@code SORTED} characteristic
+ * for a defined non-natural sort order is not mapped internally to the
+ * {@code SORTED} flag.
+ */
+ // 1, 0x00000004
+ // Matches Spliterator.SORTED
+ SORTED(1,
+ set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
+
+ /**
+ * Characteristic value signifying that an encounter order is
+ * defined for stream elements.
+ * <p>
+ * A stream can have this value, an intermediate operation can preserve,
+ * clear or inject this value, or a terminal operation can preserve or clear
+ * this value.
+ */
+ // 2, 0x00000010
+ // Matches Spliterator.ORDERED
+ ORDERED(2,
+ set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP)
+ .clear(Type.UPSTREAM_TERMINAL_OP)),
+
+ /**
+ * Characteristic value signifying that size of the stream
+ * is of a known finite size that is equal to the known finite
+ * size of the source spliterator input to the first stream
+ * in the pipeline.
+ * <p>
+ * A stream can have this value or an intermediate operation can preserve or
+ * clear this value.
+ */
+ // 3, 0x00000040
+ // Matches Spliterator.SIZED
+ SIZED(3,
+ set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),
+
+ // The following Spliterator characteristics are not currently used but a
+ // gap in the bit set is deliberately retained to enable corresponding
+ // stream flags if//when required without modification to other flag values.
+ //
+ // 4, 0x00000100 NONNULL(4, ...
+ // 5, 0x00000400 IMMUTABLE(5, ...
+ // 6, 0x00001000 CONCURRENT(6, ...
+ // 7, 0x00004000 SUBSIZED(7, ...
+
+ // The following 4 flags are currently undefined and a free for any further
+ // spliterator characteristics.
+ //
+ // 8, 0x00010000
+ // 9, 0x00040000
+ // 10, 0x00100000
+ // 11, 0x00400000
+
+ // The following flags are specific to streams and operations
+ //
+
+ /**
+ * Characteristic value signifying that an operation may short-circuit the
+ * stream.
+ * <p>
+ * An intermediate operation can preserve or inject this value,
+ * or a terminal operation can preserve or inject this value.
+ */
+ // 12, 0x01000000
+ SHORT_CIRCUIT(12,
+ set(Type.OP).set(Type.TERMINAL_OP));
+
+ // The following 2 flags are currently undefined and a free for any further
+ // stream flags if/when required
+ //
+ // 13, 0x04000000
+ // 14, 0x10000000
+ // 15, 0x40000000
+
+ /**
+ * Type of a flag
+ */
+ enum Type {
+ /**
+ * The flag is associated with spliterator characteristics.
+ */
+ SPLITERATOR,
+
+ /**
+ * The flag is associated with stream flags.
+ */
+ STREAM,
+
+ /**
+ * The flag is associated with intermediate operation flags.
+ */
+ OP,
+
+ /**
+ * The flag is associated with terminal operation flags.
+ */
+ TERMINAL_OP,
+
+ /**
+ * The flag is associated with terminal operation flags that are
+ * propagated upstream across the last stateful operation boundary
+ */
+ UPSTREAM_TERMINAL_OP
+ }
+
+ /**
+ * The bit pattern for setting/injecting a flag.
+ */
+ private static final int SET_BITS = 0b01;
+
+ /**
+ * The bit pattern for clearing a flag.
+ */
+ private static final int CLEAR_BITS = 0b10;
+
+ /**
+ * The bit pattern for preserving a flag.
+ */
+ private static final int PRESERVE_BITS = 0b11;
+
+ private static MaskBuilder set(Type t) {
+ return new MaskBuilder(new EnumMap<>(Type.class)).set(t);
+ }
+
+ private static class MaskBuilder {
+ final Map<Type, Integer> map;
+
+ MaskBuilder(Map<Type, Integer> map) {
+ this.map = map;
+ }
+
+ MaskBuilder mask(Type t, Integer i) {
+ map.put(t, i);
+ return this;
+ }
+
+ MaskBuilder set(Type t) {
+ return mask(t, SET_BITS);
+ }
+
+ MaskBuilder clear(Type t) {
+ return mask(t, CLEAR_BITS);
+ }
+
+ MaskBuilder setAndClear(Type t) {
+ return mask(t, PRESERVE_BITS);
+ }
+
+ Map<Type, Integer> build() {
+ for (Type t : Type.values()) {
+ map.putIfAbsent(t, 0b00);
+ }
+ return map;
+ }
+ }
+
+ /**
+ * The mask table for a flag, this is used to determine if a flag
+ * corresponds to a certain flag type and for creating mask constants.
+ */
+ private final Map<Type, Integer> maskTable;
+
+ /**
+ * The bit position in the bit mask.
+ */
+ private final int bitPosition;
+
+ /**
+ * The set 2 bit set offset at the bit position.
+ */
+ private final int set;
+
+ /**
+ * The clear 2 bit set offset at the bit position.
+ */
+ private final int clear;
+
+ /**
+ * The preserve 2 bit set offset at the bit position.
+ */
+ private final int preserve;
+
+ private StreamOpFlag(int position, MaskBuilder maskBuilder) {
+ this.maskTable = maskBuilder.build();
+ // Two bits per flag
+ position *= 2;
+ this.bitPosition = position;
+ this.set = SET_BITS << position;
+ this.clear = CLEAR_BITS << position;
+ this.preserve = PRESERVE_BITS << position;
+ }
+
+ /**
+ * Gets the bitmap associated with setting this characteristic.
+ *
+ * @return the bitmap for setting this characteristic
+ */
+ int set() {
+ return set;
+ }
+
+ /**
+ * Gets the bitmap associated with clearing this characteristic.
+ *
+ * @return the bitmap for clearing this characteristic
+ */
+ int clear() {
+ return clear;
+ }
+
+ /**
+ * Determines if this flag is a stream-based flag.
+ *
+ * @return true if a stream-based flag, otherwise false.
+ */
+ boolean isStreamFlag() {
+ return maskTable.get(Type.STREAM) > 0;
+ }
+
+ /**
+ * Checks if this flag is set on stream flags, injected on operation flags,
+ * and injected on combined stream and operation flags.
+ *
+ * @param flags the stream flags, operation flags, or combined stream and
+ * operation flags
+ * @return true if this flag is known, otherwise false.
+ */
+ boolean isKnown(int flags) {
+ return (flags & preserve) == set;
+ }
+
+ /**
+ * Checks if this flag is cleared on operation flags or combined stream and
+ * operation flags.
+ *
+ * @param flags the operation flags or combined stream and operations flags.
+ * @return true if this flag is preserved, otherwise false.
+ */
+ boolean isCleared(int flags) {
+ return (flags & preserve) == clear;
+ }
+
+ /**
+ * Checks if this flag is preserved on combined stream and operation flags.
+ *
+ * @param flags the combined stream and operations flags.
+ * @return true if this flag is preserved, otherwise false.
+ */
+ boolean isPreserved(int flags) {
+ return (flags & preserve) == preserve;
+ }
+
+ /**
+ * Determines if this flag can be set for a flag type.
+ *
+ * @param t the flag type.
+ * @return true if this flag can be set for the flag type, otherwise false.
+ */
+ boolean canSet(Type t) {
+ return (maskTable.get(t) & SET_BITS) > 0;
+ }
+
+ /**
+ * The bit mask for spliterator characteristics
+ */
+ static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR);
+
+ /**
+ * The bit mask for source stream flags.
+ */
+ static final int STREAM_MASK = createMask(Type.STREAM);
+
+ /**
+ * The bit mask for intermediate operation flags.
+ */
+ static final int OP_MASK = createMask(Type.OP);
+
+ /**
+ * The bit mask for terminal operation flags.
+ */
+ static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP);
+
+ /**
+ * The bit mask for upstream terminal operation flags.
+ */
+ static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP);
+
+ private static int createMask(Type t) {
+ int mask = 0;
+ for (StreamOpFlag flag : StreamOpFlag.values()) {
+ mask |= flag.maskTable.get(t) << flag.bitPosition;
+ }
+ return mask;
+ }
+
+ /**
+ * Complete flag mask.
+ */
+ private static final int FLAG_MASK = createFlagMask();
+
+ private static int createFlagMask() {
+ int mask = 0;
+ for (StreamOpFlag flag : StreamOpFlag.values()) {
+ mask |= flag.preserve;
+ }
+ return mask;
+ }
+
+ /**
+ * Flag mask for stream flags that are set.
+ */
+ private static final int FLAG_MASK_IS = STREAM_MASK;
+
+ /**
+ * Flag mask for stream flags that are cleared.
+ */
+ private static final int FLAG_MASK_NOT = STREAM_MASK << 1;
+
+ /**
+ * The initial value to be combined with the stream flags of the first
+ * stream in the pipeline.
+ */
+ static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT;
+
+ /**
+ * The bit value to set or inject {@link #DISTINCT}.
+ */
+ static final int IS_DISTINCT = DISTINCT.set;
+
+ /**
+ * The bit value to clear {@link #DISTINCT}.
+ */
+ static final int NOT_DISTINCT = DISTINCT.clear;
+
+ /**
+ * The bit value to set or inject {@link #SORTED}.
+ */
+ static final int IS_SORTED = SORTED.set;
+
+ /**
+ * The bit value to clear {@link #SORTED}.
+ */
+ static final int NOT_SORTED = SORTED.clear;
+
+ /**
+ * The bit value to set or inject {@link #ORDERED}.
+ */
+ static final int IS_ORDERED = ORDERED.set;
+
+ /**
+ * The bit value to clear {@link #ORDERED}.
+ */
+ static final int NOT_ORDERED = ORDERED.clear;
+
+ /**
+ * The bit value to set {@link #SIZED}.
+ */
+ static final int IS_SIZED = SIZED.set;
+
+ /**
+ * The bit value to clear {@link #SIZED}.
+ */
+ static final int NOT_SIZED = SIZED.clear;
+
+ /**
+ * The bit value to inject {@link #SHORT_CIRCUIT}.
+ */
+ static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
+
+ private static int getMask(int flags) {
+ return (flags == 0)
+ ? FLAG_MASK
+ : ~(flags | ((FLAG_MASK_IS & flags) << 1) | ((FLAG_MASK_NOT & flags) >> 1));
+ }
+
+ /**
+ * Combines stream or operation flags with previously combined stream and
+ * operation flags to produce updated combined stream and operation flags.
+ * <p>
+ * A flag set on stream flags or injected on operation flags,
+ * and injected combined stream and operation flags,
+ * will be injected on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag set on stream flags or injected on operation flags,
+ * and cleared on the combined stream and operation flags,
+ * will be cleared on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag set on the stream flags or injected on operation flags,
+ * and preserved on the combined stream and operation flags,
+ * will be injected on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag not set on the stream flags or cleared/preserved on operation
+ * flags, and injected on the combined stream and operation flags,
+ * will be injected on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag not set on the stream flags or cleared/preserved on operation
+ * flags, and cleared on the combined stream and operation flags,
+ * will be cleared on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag not set on the stream flags,
+ * and preserved on the combined stream and operation flags
+ * will be preserved on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag cleared on operation flags,
+ * and preserved on the combined stream and operation flags
+ * will be cleared on the updated combined stream and operation flags.
+ *
+ * <p>
+ * A flag preserved on operation flags,
+ * and preserved on the combined stream and operation flags
+ * will be preserved on the updated combined stream and operation flags.
+ *
+ * @param newStreamOrOpFlags the stream or operation flags.
+ * @param prevCombOpFlags previously combined stream and operation flags.
+ * The value {#link INITIAL_OPS_VALUE} must be used as the seed value.
+ * @return the updated combined stream and operation flags.
+ */
+ static int combineOpFlags(int newStreamOrOpFlags, int prevCombOpFlags) {
+ // 0x01 or 0x10 nibbles are transformed to 0x11
+ // 0x00 nibbles remain unchanged
+ // Then all the bits are flipped
+ // Then the result is logically or'ed with the operation flags.
+ return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)) | newStreamOrOpFlags;
+ }
+
+ /**
+ * Converts combined stream and operation flags to stream flags.
+ *
+ * <p>Each flag injected on the combined stream and operation flags will be
+ * set on the stream flags.
+ *
+ * @param combOpFlags the combined stream and operation flags.
+ * @return the stream flags.
+ */
+ static int toStreamFlags(int combOpFlags) {
+ // By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10
+ // Shift left 1 to restore set flags and mask off anything other than the set flags
+ return ((~combOpFlags) >> 1) & FLAG_MASK_IS & combOpFlags;
+ }
+
+ /**
+ * Converts stream flags to a spliterator characteristic bit set.
+ *
+ * @param streamFlags the stream flags.
+ * @return the spliterator characteristic bit set.
+ */
+ static int toCharacteristics(int streamFlags) {
+ return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK;
+ }
+
+ /**
+ * Converts a spliterator characteristic bit set to stream flags.
+ *
+ * @implSpec
+ * If the spliterator is naturally {@code SORTED} (the associated
+ * {@code Comparator} is {@code null}) then the characteristic is converted
+ * to the {@link #SORTED} flag, otherwise the characteristic is not
+ * converted.
+ *
+ * @param spliterator the spliterator from which to obtain characteristic
+ * bit set.
+ * @return the stream flags.
+ */
+ static int fromCharacteristics(Spliterator<?> spliterator) {
+ int characteristics = spliterator.characteristics();
+ if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) {
+ // Do not propagate the SORTED characteristic if it does not correspond
+ // to a natural sort order
+ return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED;
+ }
+ else {
+ return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
+ }
+ }
+
+ /**
+ * Converts a spliterator characteristic bit set to stream flags.
+ *
+ * @param characteristics the spliterator characteristic bit set.
+ * @return the stream flags.
+ */
+ static int fromCharacteristics(int characteristics) {
+ return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/StreamShape.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+/**
+ * An enum describing the known shape specializations for stream abstractions.
+ * Each will correspond to a specific subinterface of {@link BaseStream}
+ * (e.g., {@code REFERENCE} corresponds to {@code Stream}, {@code INT_VALUE}
+ * corresponds to {@code IntStream}). Each may also correspond to
+ * specializations of value-handling abstractions such as {@code Spliterator},
+ * {@code Consumer}, etc.
+ *
+ * @apiNote
+ * This enum is used by implementations to determine compatibility between
+ * streams and operations (i.e., if the output shape of a stream is compatible
+ * with the input shape of the next operation).
+ *
+ * <p>Some APIs require you to specify both a generic type and a stream shape
+ * for input or output elements, such as {@link TerminalOp} which has both
+ * generic type parameters for its input types, and a getter for the
+ * input shape. When representing primitive streams in this way, the
+ * generic type parameter should correspond to the wrapper type for that
+ * primitive type.
+ *
+ * @since 1.8
+ */
+enum StreamShape {
+ /**
+ * The shape specialization corresponding to {@code Stream} and elements
+ * that are object references.
+ */
+ REFERENCE,
+ /**
+ * The shape specialization corresponding to {@code IntStream} and elements
+ * that are {@code int} values.
+ */
+ INT_VALUE,
+ /**
+ * The shape specialization corresponding to {@code LongStream} and elements
+ * that are {@code long} values.
+ */
+ LONG_VALUE,
+ /**
+ * The shape specialization corresponding to {@code DoubleStream} and
+ * elements that are {@code double} values.
+ */
+ DOUBLE_VALUE
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/TerminalOp.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+
+/**
+ * An operation in a stream pipeline that takes a stream as input and produces
+ * a result or side-effect. A {@code TerminalOp} has an input type and stream
+ * shape, and a result type. A {@code TerminalOp} also has a set of
+ * <em>operation flags</em> that describes how the operation processes elements
+ * of the stream (such as short-circuiting or respecting encounter order; see
+ * {@link StreamOpFlag}).
+ *
+ * <p>A {@code TerminalOp} must provide a sequential and parallel implementation
+ * of the operation relative to a given stream source and set of intermediate
+ * operations.
+ *
+ * @param <E_IN> the type of input elements
+ * @param <R> the type of the result
+ * @since 1.8
+ */
+interface TerminalOp<E_IN, R> {
+ /**
+ * Gets the shape of the input type of this operation.
+ *
+ * @implSpec The default returns {@code StreamShape.REFERENCE}.
+ *
+ * @return StreamShape of the input type of this operation
+ */
+ default StreamShape inputShape() { return StreamShape.REFERENCE; }
+
+ /**
+ * Gets the stream flags of the operation. Terminal operations may set a
+ * limited subset of the stream flags defined in {@link StreamOpFlag}, and
+ * these flags are combined with the previously combined stream and
+ * intermediate operation flags for the pipeline.
+ *
+ * @implSpec The default implementation returns zero.
+ *
+ * @return the stream flags for this operation
+ * @see StreamOpFlag
+ */
+ default int getOpFlags() { return 0; }
+
+ /**
+ * Performs a parallel evaluation of the operation using the specified
+ * {@code PipelineHelper}, which describes the upstream intermediate
+ * operations.
+ *
+ * @implSpec The default performs a sequential evaluation of the operation
+ * using the specified {@code PipelineHelper}.
+ *
+ * @param helper the pipeline helper
+ * @param spliterator the source spliterator
+ * @return the result of the evaluation
+ */
+ default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
+ Spliterator<P_IN> spliterator) {
+ if (Tripwire.ENABLED)
+ Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
+ return evaluateSequential(helper, spliterator);
+ }
+
+ /**
+ * Performs a sequential evaluation of the operation using the specified
+ * {@code PipelineHelper}, which describes the upstream intermediate
+ * operations.
+ *
+ * @param helper the pipeline helper
+ * @param spliterator the source spliterator
+ * @return the result of the evaluation
+ */
+ <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
+ Spliterator<P_IN> spliterator);
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/TerminalSink.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.function.Supplier;
+
+/**
+ * A {@link Sink} which accumulates state as elements are accepted, and allows
+ * a result to be retrieved after the computation is finished.
+ *
+ * @param <T> the type of elements to be accepted
+ * @param <R> the type of the result
+ *
+ * @since 1.8
+ */
+interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/stream/Tripwire.java Tue Apr 16 22:50:48 2013 -0400
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import sun.util.logging.PlatformLogger;
+
+/**
+ * Utility class for detecting inadvertent uses of boxing in
+ * {@code java.util.stream} classes. The detection is turned on or off based on
+ * whether the system property {@code org.openjdk.java.util.stream.tripwire} is
+ * considered {@code true} according to {@link Boolean#getBoolean(String)}.
+ * This should normally be turned off for production use.
+ *
+ * @apiNote
+ * Typical usage would be for boxing code to do:
+ * <pre>{@code
+ * if (Tripwire.ENABLED)
+ * Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
+ * }</pre>
+ *
+ * @since 1.8
+ */
+final class Tripwire {
+ private static final String TRIPWIRE_PROPERTY = "org.openjdk.java.util.stream.tripwire";
+
+ /** Should debugging checks be enabled? */
+ static final boolean ENABLED = AccessController.doPrivileged(
+ (PrivilegedAction<Boolean>) () -> Boolean.getBoolean(TRIPWIRE_PROPERTY));
+
+ private Tripwire() { }
+
+ /**
+ * Produces a log warning, using {@code PlatformLogger.getLogger(className)},
+ * using the supplied message. The class name of {@code trippingClass} will
+ * be used as the first parameter to the message.
+ *
+ * @param trippingClass Name of the class generating the message
+ * @param msg A message format string of the type expected by
+ * {@link PlatformLogger}
+ */
+ static void trip(Class<?> trippingClass, String msg) {
+ PlatformLogger.getLogger(trippingClass.getName()).warning(msg, trippingClass.getName());
+ }
+}