# HG changeset patch # User psandoz # Date 1372408161 -7200 # Node ID 53b8b8c30086949bf379c34885ab878f9d5cb1c3 # Parent 8e3cb3c46ae8939395f34f5dbf9679db08b483a0 8012987: Optimizations for Stream.limit/substream Reviewed-by: mduigou Contributed-by: Brian Goetz , Paul Sandoz diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/AbstractPipeline.java --- a/jdk/src/share/classes/java/util/stream/AbstractPipeline.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/AbstractPipeline.java Fri Jun 28 10:29:21 2013 +0200 @@ -375,6 +375,12 @@ // NOTE: there are no size-injecting ops if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { backPropagationHead = p; + // Clear the short circuit flag for next pipeline stage + // This stage encapsulates short-circuiting, the next + // stage may not have any short-circuit operations, and + // if so spliterator.forEachRemaining should be be used + // for traversal + thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } depth = 0; @@ -448,6 +454,15 @@ // PipelineHelper @Override + final StreamShape getSourceShape() { + AbstractPipeline p = AbstractPipeline.this; + while (p.depth > 0) { + p = p.previousStage; + } + return p.getOutputShape(); + } + + @Override final long exactOutputSizeIfKnown(Spliterator spliterator) { return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1; } @@ -503,6 +518,16 @@ } @Override + final Spliterator wrapSpliterator(Spliterator sourceSpliterator) { + if (depth == 0) { + return (Spliterator) sourceSpliterator; + } + else { + return wrap(this, () -> sourceSpliterator, isParallel()); + } + } + + @Override @SuppressWarnings("unchecked") final Node evaluate(Spliterator spliterator, boolean flatten, diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/AbstractTask.java --- a/jdk/src/share/classes/java/util/stream/AbstractTask.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/AbstractTask.java Fri Jun 28 10:29:21 2013 +0200 @@ -316,6 +316,7 @@ else { K l = task.leftChild = task.makeChild(split); K r = task.rightChild = task.makeChild(task.spliterator); + task.spliterator = null; task.setPendingCount(1); l.fork(); task = r; diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/DoubleStream.java --- a/jdk/src/share/classes/java/util/stream/DoubleStream.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/DoubleStream.java Fri Jun 28 10:29:21 2013 +0200 @@ -743,14 +743,7 @@ */ public static DoubleStream generate(DoubleSupplier s) { Objects.requireNonNull(s); - return StreamSupport.doubleStream(Spliterators.spliteratorUnknownSize( - new PrimitiveIterator.OfDouble() { - @Override - public boolean hasNext() { return true; } - - @Override - public double nextDouble() { return s.getAsDouble(); } - }, - Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL)); + return StreamSupport.doubleStream( + new StreamSpliterators.InfiniteSupplyingSpliterator.OfDouble(Long.MAX_VALUE, s)); } } diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/ForEachOps.java --- a/jdk/src/share/classes/java/util/stream/ForEachOps.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java Fri Jun 28 10:29:21 2013 +0200 @@ -342,7 +342,7 @@ doCompute(this); } - private static void doCompute(ForEachOrderedTask task) { + private static void doCompute(ForEachOrderedTask task) { while (true) { Spliterator split; if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/IntStream.java --- a/jdk/src/share/classes/java/util/stream/IntStream.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/IntStream.java Fri Jun 28 10:29:21 2013 +0200 @@ -745,15 +745,8 @@ */ public static IntStream generate(IntSupplier s) { Objects.requireNonNull(s); - return StreamSupport.intStream(Spliterators.spliteratorUnknownSize( - new PrimitiveIterator.OfInt() { - @Override - public boolean hasNext() { return true; } - - @Override - public int nextInt() { return s.getAsInt(); } - }, - Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL)); + return StreamSupport.intStream( + new StreamSpliterators.InfiniteSupplyingSpliterator.OfInt(Long.MAX_VALUE, s)); } /** diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/LongStream.java --- a/jdk/src/share/classes/java/util/stream/LongStream.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/LongStream.java Fri Jun 28 10:29:21 2013 +0200 @@ -736,15 +736,8 @@ */ public static LongStream generate(LongSupplier s) { Objects.requireNonNull(s); - return StreamSupport.longStream(Spliterators.spliteratorUnknownSize( - new PrimitiveIterator.OfLong() { - @Override - public boolean hasNext() { return true; } - - @Override - public long nextLong() { return s.getAsLong(); } - }, - Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL)); + return StreamSupport.longStream( + new StreamSpliterators.InfiniteSupplyingSpliterator.OfLong(Long.MAX_VALUE, s)); } /** diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/PipelineHelper.java --- a/jdk/src/share/classes/java/util/stream/PipelineHelper.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/PipelineHelper.java Fri Jun 28 10:29:21 2013 +0200 @@ -44,7 +44,7 @@ * and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator, * java.util.function.IntFunction)}, methods, which can use the * {@code PipelineHelper} to access information about the pipeline such as - * input shape, output shape, stream flags, and size, and use the helper methods + * head shape, stream flags, and size, and use the helper methods * such as {@link #wrapAndCopyInto(Sink, Spliterator)}, * {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute * pipeline operations. @@ -55,6 +55,13 @@ abstract class PipelineHelper { /** + * Gets the stream shape for the source of the pipeline segment. + * + * @return the stream shape for the source of the pipeline segment. + */ + abstract StreamShape getSourceShape(); + + /** * Gets the combined stream and operation flags for the output of the described * pipeline. This will incorporate stream flags from the stream source, all * the intermediate operations and the terminal operation. @@ -146,6 +153,14 @@ abstract Sink wrapSink(Sink sink); /** + * + * @param spliterator + * @param + * @return + */ + abstract Spliterator wrapSpliterator(Spliterator spliterator); + + /** * Constructs a @{link Node.Builder} compatible with the output shape of * this {@code PipelineHelper}. * diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/SliceOps.java --- a/jdk/src/share/classes/java/util/stream/SliceOps.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/SliceOps.java Fri Jun 28 10:29:21 2013 +0200 @@ -24,14 +24,9 @@ */ package java.util.stream; -import java.util.ArrayList; -import java.util.List; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; -import java.util.function.DoubleConsumer; -import java.util.function.IntConsumer; import java.util.function.IntFunction; -import java.util.function.LongConsumer; /** * Factory for instances of a short-circuiting stateful intermediate operations @@ -45,6 +40,63 @@ private SliceOps() { } /** + * Calculates the sliced size given the current size, number of elements + * skip, and the number of elements to limit. + * + * @param size the current size + * @param skip the number of elements to skip, assumed to be >= 0 + * @param limit the number of elements to limit, assumed to be >= 0, with + * a value of {@code Long.MAX_VALUE} if there is no limit + * @return the sliced size + */ + private static long calcSize(long size, long skip, long limit) { + return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1; + } + + /** + * Calculates the slice fence, which is one past the index of the slice + * range + * @param skip the number of elements to skip, assumed to be >= 0 + * @param limit the number of elements to limit, assumed to be >= 0, with + * a value of {@code Long.MAX_VALUE} if there is no limit + * @return the slice fence. + */ + private static long calcSliceFence(long skip, long limit) { + long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE; + // Check for overflow + return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE; + } + + /** + * Creates a slice spliterator given a stream shape governing the + * spliterator type. Requires that the underlying Spliterator + * be SUBSIZED. + */ + @SuppressWarnings("unchecked") + private static Spliterator sliceSpliterator(StreamShape shape, + Spliterator s, + long skip, long limit) { + assert s.hasCharacteristics(Spliterator.SUBSIZED); + long sliceFence = calcSliceFence(skip, limit); + switch (shape) { + case REFERENCE: + return new StreamSpliterators + .SliceSpliterator.OfRef<>(s, skip, sliceFence); + case INT_VALUE: + return (Spliterator) new StreamSpliterators + .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence); + case LONG_VALUE: + return (Spliterator) new StreamSpliterators + .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence); + case DOUBLE_VALUE: + return (Spliterator) new StreamSpliterators + .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence); + default: + throw new IllegalStateException("Unknown shape " + shape); + } + } + + /** * Appends a "slice" operation to the provided stream. The slice operation * may be may be skip-only, limit-only, or skip-and-limit. * @@ -61,11 +113,71 @@ return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, flags(limit)) { + Spliterator unorderedSkipLimitSpliterator(Spliterator s, + long skip, long limit, long sizeIfKnown) { + if (skip <= sizeIfKnown) { + // Use just the limit if the number of elements + // to skip is <= the known pipeline size + limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; + skip = 0; + } + return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + return new StreamSpliterators.SliceSpliterator.OfRef<>( + helper.wrapSpliterator(spliterator), + skip, + calcSliceFence(skip, limit)); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return unorderedSkipLimitSpliterator( + helper.wrapSpliterator(spliterator), + skip, limit, size); + } + else { + // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n) + // regardless of the value of n + // Need to adjust the target size of splitting for the + // SliceTask from say (size / k) to say min(size / k, 1 << 14) + // This will limit the size of the buffers created at the leaf nodes + // cancellation will be more aggressive cancelling later tasks + // if the target slice size has been reached from a given task, + // cancellation should also clear local results if any + return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit). + invoke().spliterator(); + } + } + @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { - return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke(); + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + // Because the pipeline is SIZED the slice spliterator + // can be created from the source, this requires matching + // to shape of the source, and is potentially more efficient + // than creating the slice spliterator from the pipeline + // wrapping spliterator + Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); + return Nodes.collect(helper, s, true, generator); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + Spliterator s = unorderedSkipLimitSpliterator( + helper.wrapSpliterator(spliterator), + skip, limit, size); + // Collect using this pipeline, which is empty and therefore + // can be used with the pipeline wrapping spliterator + // Note that we cannot create a slice spliterator from + // the source spliterator if the pipeline is not SIZED + return Nodes.collect(this, s, true, generator); + } + else { + return new SliceTask<>(this, helper, spliterator, generator, skip, limit). + invoke(); + } } @Override @@ -75,6 +187,11 @@ long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override + public void begin(long size) { + downstream.begin(calcSize(size, skip, m)); + } + + @Override public void accept(T t) { if (n == 0) { if (m > 0) { @@ -112,11 +229,64 @@ return new IntPipeline.StatefulOp(upstream, StreamShape.INT_VALUE, flags(limit)) { + Spliterator.OfInt unorderedSkipLimitSpliterator( + Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) { + if (skip <= sizeIfKnown) { + // Use just the limit if the number of elements + // to skip is <= the known pipeline size + limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; + skip = 0; + } + return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + return new StreamSpliterators.SliceSpliterator.OfInt( + (Spliterator.OfInt) helper.wrapSpliterator(spliterator), + skip, + calcSliceFence(skip, limit)); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return unorderedSkipLimitSpliterator( + (Spliterator.OfInt) helper.wrapSpliterator(spliterator), + skip, limit, size); + } + else { + return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit). + invoke().spliterator(); + } + } + @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { - return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke(); + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + // Because the pipeline is SIZED the slice spliterator + // can be created from the source, this requires matching + // to shape of the source, and is potentially more efficient + // than creating the slice spliterator from the pipeline + // wrapping spliterator + Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); + return Nodes.collectInt(helper, s, true); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + Spliterator.OfInt s = unorderedSkipLimitSpliterator( + (Spliterator.OfInt) helper.wrapSpliterator(spliterator), + skip, limit, size); + // Collect using this pipeline, which is empty and therefore + // can be used with the pipeline wrapping spliterator + // Note that we cannot create a slice spliterator from + // the source spliterator if the pipeline is not SIZED + return Nodes.collectInt(this, s, true); + } + else { + return new SliceTask<>(this, helper, spliterator, generator, skip, limit). + invoke(); + } } @Override @@ -126,6 +296,11 @@ long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override + public void begin(long size) { + downstream.begin(calcSize(size, skip, m)); + } + + @Override public void accept(int t) { if (n == 0) { if (m > 0) { @@ -163,11 +338,64 @@ return new LongPipeline.StatefulOp(upstream, StreamShape.LONG_VALUE, flags(limit)) { + Spliterator.OfLong unorderedSkipLimitSpliterator( + Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) { + if (skip <= sizeIfKnown) { + // Use just the limit if the number of elements + // to skip is <= the known pipeline size + limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; + skip = 0; + } + return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + return new StreamSpliterators.SliceSpliterator.OfLong( + (Spliterator.OfLong) helper.wrapSpliterator(spliterator), + skip, + calcSliceFence(skip, limit)); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return unorderedSkipLimitSpliterator( + (Spliterator.OfLong) helper.wrapSpliterator(spliterator), + skip, limit, size); + } + else { + return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit). + invoke().spliterator(); + } + } + @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { - return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke(); + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + // Because the pipeline is SIZED the slice spliterator + // can be created from the source, this requires matching + // to shape of the source, and is potentially more efficient + // than creating the slice spliterator from the pipeline + // wrapping spliterator + Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); + return Nodes.collectLong(helper, s, true); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + Spliterator.OfLong s = unorderedSkipLimitSpliterator( + (Spliterator.OfLong) helper.wrapSpliterator(spliterator), + skip, limit, size); + // Collect using this pipeline, which is empty and therefore + // can be used with the pipeline wrapping spliterator + // Note that we cannot create a slice spliterator from + // the source spliterator if the pipeline is not SIZED + return Nodes.collectLong(this, s, true); + } + else { + return new SliceTask<>(this, helper, spliterator, generator, skip, limit). + invoke(); + } } @Override @@ -177,6 +405,11 @@ long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override + public void begin(long size) { + downstream.begin(calcSize(size, skip, m)); + } + + @Override public void accept(long t) { if (n == 0) { if (m > 0) { @@ -214,11 +447,64 @@ return new DoublePipeline.StatefulOp(upstream, StreamShape.DOUBLE_VALUE, flags(limit)) { + Spliterator.OfDouble unorderedSkipLimitSpliterator( + Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) { + if (skip <= sizeIfKnown) { + // Use just the limit if the number of elements + // to skip is <= the known pipeline size + limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; + skip = 0; + } + return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + return new StreamSpliterators.SliceSpliterator.OfDouble( + (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), + skip, + calcSliceFence(skip, limit)); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return unorderedSkipLimitSpliterator( + (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), + skip, limit, size); + } + else { + return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit). + invoke().spliterator(); + } + } + @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { - return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke(); + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { + // Because the pipeline is SIZED the slice spliterator + // can be created from the source, this requires matching + // to shape of the source, and is potentially more efficient + // than creating the slice spliterator from the pipeline + // wrapping spliterator + Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); + return Nodes.collectDouble(helper, s, true); + } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + Spliterator.OfDouble s = unorderedSkipLimitSpliterator( + (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), + skip, limit, size); + // Collect using this pipeline, which is empty and therefore + // can be used with the pipeline wrapping spliterator + // Note that we cannot create a slice spliterator from + // the source spliterator if the pipeline is not SIZED + return Nodes.collectDouble(this, s, true); + } + else { + return new SliceTask<>(this, helper, spliterator, generator, skip, limit). + invoke(); + } } @Override @@ -228,6 +514,11 @@ long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override + public void begin(long size) { + downstream.begin(calcSize(size, skip, m)); + } + + @Override public void accept(double t) { if (n == 0) { if (m > 0) { @@ -253,20 +544,6 @@ return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0); } - // Parallel strategy -- two cases - // IF we have full size information - // - decompose, keeping track of each leaf's (offset, size) - // - calculate leaf only if intersection between (offset, size) and desired slice - // - Construct a Node containing the appropriate sections of the appropriate leaves - // IF we don't - // - decompose, and calculate size of each leaf - // - on complete of any node, compute completed initial size from the root, and if big enough, cancel later nodes - // - @@@ this can be significantly improved - - // @@@ Currently we don't do the sized version at all - - // @@@ Should take into account ORDERED flag; if not ORDERED, we can limit in temporal order instead - /** * {@code ForkJoinTask} implementing slice computation. * @@ -319,19 +596,18 @@ ? op.exactOutputSizeIfKnown(spliterator) : -1; final Node.Builder nb = op.makeNodeBuilder(sizeIfKnown, generator); - Sink opSink = op.opWrapSink(op.sourceOrOpFlags, nb); - - if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.sourceOrOpFlags)) - helper.wrapAndCopyInto(opSink, spliterator); - else - helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); - return nb.build(); + Sink opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); + helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); + // It is necessary to truncate here since the result at the root + // can only be set once + return doTruncate(nb.build()); } else { Node node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator), - spliterator).build(); + spliterator).build(); thisNodeSize = node.count(); completed = true; + spliterator = null; return node; } } @@ -339,198 +615,95 @@ @Override public final void onCompletion(CountedCompleter caller) { if (!isLeaf()) { + Node result; thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; + if (canceled) { + thisNodeSize = 0; + result = getEmptyResult(); + } + else if (thisNodeSize == 0) + result = getEmptyResult(); + else if (leftChild.thisNodeSize == 0) + result = rightChild.getLocalResult(); + else { + result = Nodes.conc(op.getOutputShape(), + leftChild.getLocalResult(), rightChild.getLocalResult()); + } + setLocalResult(isRoot() ? doTruncate(result) : result); completed = true; - - if (isRoot()) { - // Only collect nodes once absolute size information is known + } + if (targetSize >= 0 + && !isRoot() + && isLeftCompleted(targetOffset + targetSize)) + cancelLaterNodes(); - ArrayList> nodes = new ArrayList<>(); - visit(nodes, 0); - Node result; - if (nodes.size() == 0) - result = Nodes.emptyNode(op.getOutputShape()); - else if (nodes.size() == 1) - result = nodes.get(0); - else - // This will create a tree of depth 1 and will not be a sub-tree - // for leaf nodes within the require range - result = conc(op.getOutputShape(), nodes); - setLocalResult(result); - } - } - if (targetSize >= 0) { - if (((SliceTask) getRoot()).leftSize() >= targetOffset + targetSize) - cancelLaterNodes(); - } - // Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree + super.onCompletion(caller); } - /** Compute the cumulative size of the longest leading prefix of completed children */ - private long leftSize() { + @Override + protected void cancel() { + super.cancel(); if (completed) - return thisNodeSize; - else if (isLeaf()) - return 0; - else { - long leftSize = 0; - for (SliceTask child = leftChild, p = null; child != p; - p = child, child = rightChild) { - if (child.completed) - leftSize += child.thisNodeSize; - else { - leftSize += child.leftSize(); - break; - } - } - return leftSize; - } + setLocalResult(getEmptyResult()); } - private void visit(List> results, int offset) { - if (!isLeaf()) { - for (SliceTask child = leftChild, p = null; child != p; - p = child, child = rightChild) { - child.visit(results, offset); - offset += child.thisNodeSize; - } - } - else { - if (results.size() == 0) { - if (offset + thisNodeSize >= targetOffset) - results.add(truncateNode(getLocalResult(), - Math.max(0, targetOffset - offset), - targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0)); - } - else { - if (targetSize == -1 || offset < targetOffset + targetSize) { - results.add(truncateNode(getLocalResult(), - 0, - targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0)); - } - } - } + private Node doTruncate(Node input) { + long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize; + return input.truncate(targetOffset, to, generator); } /** - * Return a new node describing the result of truncating an existing Node - * at the left and/or right. - */ - private Node truncateNode(Node input, - long skipLeft, long skipRight) { - if (skipLeft == 0 && skipRight == 0) - return input; - else { - return truncateNode(input, skipLeft, thisNodeSize - skipRight, generator); - } - } - /** - * Truncate a {@link Node}, returning a node describing a subsequence of - * the contents of the input node. + * Determine if the number of completed elements in this node and nodes + * to the left of this node is greater than or equal to the target size. * - * @param the type of elements of the input node and truncated node - * @param input the input node - * @param from the starting offset to include in the truncated node (inclusive) - * @param to the ending offset ot include in the truncated node (exclusive) - * @param generator the array factory (only used for reference nodes) - * @return the truncated node + * @param target the target size + * @return true if the number of elements is greater than or equal to + * the target size, otherwise false. */ - @SuppressWarnings("unchecked") - private static Node truncateNode(Node input, long from, long to, IntFunction generator) { - StreamShape shape = input.getShape(); - long size = truncatedSize(input.count(), from, to); - if (size == 0) - return Nodes.emptyNode(shape); - else if (from == 0 && to >= input.count()) - return input; - - switch (shape) { - case REFERENCE: { - Spliterator spliterator = input.spliterator(); - Node.Builder nodeBuilder = Nodes.builder(size, generator); - nodeBuilder.begin(size); - for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance(nodeBuilder); i++) { } - nodeBuilder.end(); - return nodeBuilder.build(); - } - case INT_VALUE: { - Spliterator.OfInt spliterator = ((Node.OfInt) input).spliterator(); - Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size); - nodeBuilder.begin(size); - for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { } - nodeBuilder.end(); - return (Node) nodeBuilder.build(); + private boolean isLeftCompleted(long target) { + long size = completed ? thisNodeSize : completedSize(target); + if (size >= target) + return true; + for (SliceTask parent = getParent(), node = this; + parent != null; + node = parent, parent = parent.getParent()) { + if (node == parent.rightChild) { + SliceTask left = parent.leftChild; + if (left != null) { + size += left.completedSize(target); + if (size >= target) + return true; + } } - case LONG_VALUE: { - Spliterator.OfLong spliterator = ((Node.OfLong) input).spliterator(); - Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size); - nodeBuilder.begin(size); - for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { } - nodeBuilder.end(); - return (Node) nodeBuilder.build(); - } - case DOUBLE_VALUE: { - Spliterator.OfDouble spliterator = ((Node.OfDouble) input).spliterator(); - Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size); - nodeBuilder.begin(size); - for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { } - nodeBuilder.end(); - return (Node) nodeBuilder.build(); - } - default: - throw new IllegalStateException("Unknown shape " + shape); } - } - - private static long truncatedSize(long size, long from, long to) { - if (from >= 0) - size = Math.max(0, size - from); - long limit = to - from; - if (limit >= 0) - size = Math.min(size, limit); - return size; + return size >= target; } /** - * Produces a concatenated {@link Node} that has two or more children. - *

The count of the concatenated node is equal to the sum of the count - * of each child. Traversal of the concatenated node traverses the content - * of each child in encounter order of the list of children. Splitting a - * spliterator obtained from the concatenated node preserves the encounter - * order of the list of children. - * - *

The result may be a concatenated node, the input sole node if the size - * of the list is 1, or an empty node. + * Compute the number of completed elements in this node. + *

+ * Computation terminates if all nodes have been processed or the + * number of completed elements is greater than or equal to the target + * size. * - * @param the type of elements of the concatenated node - * @param shape the shape of the concatenated node to be created - * @param nodes the input nodes - * @return a {@code Node} covering the elements of the input nodes - * @throws IllegalStateException if all {@link Node} elements of the list - * are an not instance of type supported by this factory. + * @param target the target size + * @return return the number of completed elements */ - @SuppressWarnings("unchecked") - private static Node conc(StreamShape shape, List> nodes) { - int size = nodes.size(); - if (size == 0) - return Nodes.emptyNode(shape); - else if (size == 1) - return nodes.get(0); + private long completedSize(long target) { + if (completed) + return thisNodeSize; else { - // Create a right-balanced tree when there are more that 2 nodes - List> refNodes = (List>) nodes; - Node c = Nodes.conc(shape, refNodes.get(size - 2), refNodes.get(size - 1)); - for (int i = size - 3; i >= 0; i--) { - c = Nodes.conc(shape, refNodes.get(i), c); + SliceTask left = leftChild; + SliceTask right = rightChild; + if (left == null || right == null) { + // must be completed + return thisNodeSize; } - return c; + else { + long leftSize = left.completedSize(target); + return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target); + } } } - } - } diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/Stream.java --- a/jdk/src/share/classes/java/util/stream/Stream.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/Stream.java Fri Jun 28 10:29:21 2013 +0200 @@ -880,14 +880,7 @@ */ public static Stream generate(Supplier s) { Objects.requireNonNull(s); - return StreamSupport.stream(Spliterators.spliteratorUnknownSize( - new Iterator() { - @Override - public boolean hasNext() { return true; } - - @Override - public T next() { return s.get(); } - }, - Spliterator.ORDERED | Spliterator.IMMUTABLE)); + return StreamSupport.stream( + new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s)); } } diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/src/share/classes/java/util/stream/StreamSpliterators.java --- a/jdk/src/share/classes/java/util/stream/StreamSpliterators.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/src/share/classes/java/util/stream/StreamSpliterators.java Fri Jun 28 10:29:21 2013 +0200 @@ -26,11 +26,15 @@ import java.util.Comparator; import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.DoubleConsumer; +import java.util.function.DoubleSupplier; import java.util.function.IntConsumer; +import java.util.function.IntSupplier; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.function.Supplier; /** @@ -212,9 +216,10 @@ @Override public final long estimateSize() { init(); - return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) - ? spliterator.estimateSize() - : Long.MAX_VALUE; + // Use the estimate of the wrapped spliterator + // Note this may not be accurate if there are filter/flatMap + // operations filtering or adding elements to the stream + return spliterator.estimateSize(); } @Override @@ -240,7 +245,7 @@ // but for sub-splits only an estimate is known if ((c & Spliterator.SIZED) != 0) { c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); - c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED); + c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED)); } return c; @@ -304,7 +309,7 @@ finished = true; } else { - while (tryAdvance(consumer)) { } + do { } while (tryAdvance(consumer)); } } } @@ -360,7 +365,7 @@ finished = true; } else { - while (tryAdvance(consumer)) { } + do { } while (tryAdvance(consumer)); } } } @@ -416,7 +421,7 @@ finished = true; } else { - while (tryAdvance(consumer)) { } + do { } while (tryAdvance(consumer)); } } } @@ -472,7 +477,7 @@ finished = true; } else { - while (tryAdvance(consumer)) { } + do { } while (tryAdvance(consumer)); } } } @@ -483,17 +488,17 @@ * first call to any spliterator method. * @param */ - static class DelegatingSpliterator implements Spliterator { - private final Supplier> supplier; + static class DelegatingSpliterator> + implements Spliterator { + private final Supplier supplier; - private Spliterator s; + private T_SPLITR s; - @SuppressWarnings("unchecked") - DelegatingSpliterator(Supplier> supplier) { - this.supplier = (Supplier>) supplier; + DelegatingSpliterator(Supplier supplier) { + this.supplier = supplier; } - Spliterator get() { + T_SPLITR get() { if (s == null) { s = supplier.get(); } @@ -501,8 +506,8 @@ } @Override - public Spliterator trySplit() { - return get().trySplit(); + public T_SPLITR trySplit() { + return (T_SPLITR) get().trySplit(); } @Override @@ -540,97 +545,881 @@ return getClass().getName() + "[" + get() + "]"; } - static final class OfInt extends DelegatingSpliterator implements Spliterator.OfInt { - private Spliterator.OfInt s; - - OfInt(Supplier supplier) { - super(supplier); - } - - @Override - Spliterator.OfInt get() { - if (s == null) { - s = (Spliterator.OfInt) super.get(); - } - return s; - } - - @Override - public Spliterator.OfInt trySplit() { - return get().trySplit(); - } - - @Override - public boolean tryAdvance(IntConsumer consumer) { - return get().tryAdvance(consumer); - } - - @Override - public void forEachRemaining(IntConsumer consumer) { - get().forEachRemaining(consumer); - } - } - - static final class OfLong extends DelegatingSpliterator implements Spliterator.OfLong { - private Spliterator.OfLong s; - - OfLong(Supplier supplier) { + static class OfPrimitive> + extends DelegatingSpliterator + implements Spliterator.OfPrimitive { + OfPrimitive(Supplier supplier) { super(supplier); } @Override - Spliterator.OfLong get() { - if (s == null) { - s = (Spliterator.OfLong) super.get(); - } - return s; - } - - @Override - public Spliterator.OfLong trySplit() { - return get().trySplit(); - } - - @Override - public boolean tryAdvance(LongConsumer consumer) { + public boolean tryAdvance(T_CONS consumer) { return get().tryAdvance(consumer); } @Override - public void forEachRemaining(LongConsumer consumer) { + public void forEachRemaining(T_CONS consumer) { get().forEachRemaining(consumer); } } - static final class OfDouble extends DelegatingSpliterator implements Spliterator.OfDouble { - private Spliterator.OfDouble s; + static final class OfInt + extends OfPrimitive + implements Spliterator.OfInt { + + OfInt(Supplier supplier) { + super(supplier); + } + } + + static final class OfLong + extends OfPrimitive + implements Spliterator.OfLong { + + OfLong(Supplier supplier) { + super(supplier); + } + } + + static final class OfDouble + extends OfPrimitive + implements Spliterator.OfDouble { OfDouble(Supplier supplier) { super(supplier); } + } + } + + /** + * A slice Spliterator from a source Spliterator that reports + * {@code SUBSIZED}. + * + */ + static abstract class SliceSpliterator> { + // The start index of the slice + final long sliceOrigin; + // One past the last index of the slice + final long sliceFence; + + // The spliterator to slice + T_SPLITR s; + // current (absolute) index, modified on advance/split + long index; + // one past last (absolute) index or sliceFence, which ever is smaller + long fence; + + SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) { + assert s.hasCharacteristics(Spliterator.SUBSIZED); + this.s = s; + this.sliceOrigin = sliceOrigin; + this.sliceFence = sliceFence; + this.index = origin; + this.fence = fence; + } + + protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence); + + public T_SPLITR trySplit() { + if (sliceOrigin >= fence) + return null; + + if (index >= fence) + return null; + + // Keep splitting until the left and right splits intersect with the slice + // thereby ensuring the size estimate decreases. + // This also avoids creating empty spliterators which can result in + // existing and additionally created F/J tasks that perform + // redundant work on no elements. + while (true) { + T_SPLITR leftSplit = (T_SPLITR) s.trySplit(); + if (leftSplit == null) + return null; + + long leftSplitFenceUnbounded = index + leftSplit.estimateSize(); + long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence); + if (sliceOrigin >= leftSplitFence) { + // The left split does not intersect with, and is to the left of, the slice + // The right split does intersect + // Discard the left split and split further with the right split + index = leftSplitFence; + } + else if (leftSplitFence >= sliceFence) { + // The right split does not intersect with, and is to the right of, the slice + // The left split does intersect + // Discard the right split and split further with the left split + s = leftSplit; + fence = leftSplitFence; + } + else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) { + // The left split is contained within the slice, return the underlying left split + // Right split is contained within or intersects with the slice + index = leftSplitFence; + return leftSplit; + } else { + // The left split intersects with the slice + // Right split is contained within or intersects with the slice + return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence); + } + } + } + + public long estimateSize() { + return (sliceOrigin < fence) + ? fence - Math.max(sliceOrigin, index) : 0; + } + + public int characteristics() { + return s.characteristics(); + } + + static final class OfRef + extends SliceSpliterator> + implements Spliterator { + + OfRef(Spliterator s, long sliceOrigin, long sliceFence) { + this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); + } + + private OfRef(Spliterator s, + long sliceOrigin, long sliceFence, long origin, long fence) { + super(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected Spliterator makeSpliterator(Spliterator s, + long sliceOrigin, long sliceFence, + long origin, long fence) { + return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + public boolean tryAdvance(Consumer action) { + if (sliceOrigin >= fence) + return false; + + while (sliceOrigin > index) { + s.tryAdvance(e -> {}); + index++; + } + + if (index >= fence) + return false; + + index++; + return s.tryAdvance(action); + } + + @Override + public void forEachRemaining(Consumer action) { + if (sliceOrigin >= fence) + return; + + if (index >= fence) + return; + + if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { + // The spliterator is contained within the slice + s.forEachRemaining(action); + index = fence; + } else { + // The spliterator intersects with the slice + while (sliceOrigin > index) { + s.tryAdvance(e -> {}); + index++; + } + // Traverse elements up to the fence + for (;index < fence; index++) { + s.tryAdvance(action); + } + } + } + } + + static abstract class OfPrimitive, + T_CONS> + extends SliceSpliterator + implements Spliterator.OfPrimitive { + + OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) { + this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); + } + + private OfPrimitive(T_SPLITR s, + long sliceOrigin, long sliceFence, long origin, long fence) { + super(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + public boolean tryAdvance(T_CONS action) { + if (sliceOrigin >= fence) + return false; + + while (sliceOrigin > index) { + s.tryAdvance(emptyConsumer()); + index++; + } + + if (index >= fence) + return false; + + index++; + return s.tryAdvance(action); + } + + @Override + public void forEachRemaining(T_CONS action) { + if (sliceOrigin >= fence) + return; + + if (index >= fence) + return; + + if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { + // The spliterator is contained within the slice + s.forEachRemaining(action); + index = fence; + } else { + // The spliterator intersects with the slice + while (sliceOrigin > index) { + s.tryAdvance(emptyConsumer()); + index++; + } + // Traverse elements up to the fence + for (;index < fence; index++) { + s.tryAdvance(action); + } + } + } + + protected abstract T_CONS emptyConsumer(); + } + + static final class OfInt extends OfPrimitive + implements Spliterator.OfInt { + OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) { + super(s, sliceOrigin, sliceFence); + } + + OfInt(Spliterator.OfInt s, + long sliceOrigin, long sliceFence, long origin, long fence) { + super(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s, + long sliceOrigin, long sliceFence, + long origin, long fence) { + return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected IntConsumer emptyConsumer() { + return e -> {}; + } + } + + static final class OfLong extends OfPrimitive + implements Spliterator.OfLong { + OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) { + super(s, sliceOrigin, sliceFence); + } + + OfLong(Spliterator.OfLong s, + long sliceOrigin, long sliceFence, long origin, long fence) { + super(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s, + long sliceOrigin, long sliceFence, + long origin, long fence) { + return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected LongConsumer emptyConsumer() { + return e -> {}; + } + } + + static final class OfDouble extends OfPrimitive + implements Spliterator.OfDouble { + OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) { + super(s, sliceOrigin, sliceFence); + } + + OfDouble(Spliterator.OfDouble s, + long sliceOrigin, long sliceFence, long origin, long fence) { + super(s, sliceOrigin, sliceFence, origin, fence); + } + + @Override + protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s, + long sliceOrigin, long sliceFence, + long origin, long fence) { + return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence); + } @Override - Spliterator.OfDouble get() { - if (s == null) { - s = (Spliterator.OfDouble) super.get(); + protected DoubleConsumer emptyConsumer() { + return e -> {}; + } + } + } + + /** + * A slice Spliterator that does not preserve order, if any, of a source + * Spliterator. + * + * Note: The source spliterator may report {@code ORDERED} since that + * spliterator be the result of a previous pipeline stage that was + * collected to a {@code Node}. It is the order of the pipeline stage + * that governs whether the this slice spliterator is to be used or not. + */ + static abstract class UnorderedSliceSpliterator> { + static final int CHUNK_SIZE = 1 << 7; + + // The spliterator to slice + protected final T_SPLITR s; + protected final boolean unlimited; + private final long skipThreshold; + private final AtomicLong permits; + + UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { + this.s = s; + this.unlimited = limit < 0; + this.skipThreshold = limit >= 0 ? limit : 0; + this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); + } + + UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator parent) { + this.s = s; + this.unlimited = parent.unlimited; + this.permits = parent.permits; + this.skipThreshold = parent.skipThreshold; + } + + /** + * Acquire permission to skip or process elements. The caller must + * first acquire the elements, then consult this method for guidance + * as to what to do with the data. + * + *

We use an {@code AtomicLong} to atomically maintain a counter, + * which is initialized as skip+limit if we are limiting, or skip only + * if we are not limiting. The user should consult the method + * {@code checkPermits()} before acquiring data elements. + * + * @param numElements the number of elements the caller has in hand + * @return the number of elements that should be processed; any + * remaining elements should be discarded. + */ + protected final long acquirePermits(long numElements) { + long remainingPermits; + long grabbing; + // permits never increase, and don't decrease below zero + assert numElements > 0; + do { + remainingPermits = permits.get(); + if (remainingPermits == 0) + return unlimited ? numElements : 0; + grabbing = Math.min(remainingPermits, numElements); + } while (grabbing > 0 && + !permits.compareAndSet(remainingPermits, remainingPermits - grabbing)); + + if (unlimited) + return Math.max(numElements - grabbing, 0); + else if (remainingPermits > skipThreshold) + return Math.max(grabbing - (remainingPermits - skipThreshold), 0); + else + return grabbing; + } + + enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED } + + /** Call to check if permits might be available before acquiring data */ + protected final PermitStatus permitStatus() { + if (permits.get() > 0) + return PermitStatus.MAYBE_MORE; + else + return unlimited ? PermitStatus.UNLIMITED : PermitStatus.NO_MORE; + } + + public final T_SPLITR trySplit() { + // Stop splitting when there are no more limit permits + if (permits.get() == 0) + return null; + T_SPLITR split = (T_SPLITR) s.trySplit(); + return split == null ? null : makeSpliterator(split); + } + + protected abstract T_SPLITR makeSpliterator(T_SPLITR s); + + public final long estimateSize() { + return s.estimateSize(); + } + + public final int characteristics() { + return s.characteristics() & + ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED); + } + + static final class OfRef extends UnorderedSliceSpliterator> + implements Spliterator, Consumer { + T tmpSlot; + + OfRef(Spliterator s, long skip, long limit) { + super(s, skip, limit); + } + + OfRef(Spliterator s, OfRef parent) { + super(s, parent); + } + + @Override + public final void accept(T t) { + tmpSlot = t; + } + + @Override + public boolean tryAdvance(Consumer action) { + while (permitStatus() != PermitStatus.NO_MORE) { + if (!s.tryAdvance(this)) + return false; + else if (acquirePermits(1) == 1) { + action.accept(tmpSlot); + tmpSlot = null; + return true; + } + } + return false; + } + + @Override + public void forEachRemaining(Consumer action) { + ArrayBuffer.OfRef sb = null; + PermitStatus permitStatus; + while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { + if (permitStatus == PermitStatus.MAYBE_MORE) { + // Optimistically traverse elements up to a threshold of CHUNK_SIZE + if (sb == null) + sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE); + else + sb.reset(); + long permitsRequested = 0; + do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE); + if (permitsRequested == 0) + return; + sb.forEach(action, acquirePermits(permitsRequested)); + } + else { + // Must be UNLIMITED; let 'er rip + s.forEachRemaining(action); + return; + } + } + } + + @Override + protected Spliterator makeSpliterator(Spliterator s) { + return new UnorderedSliceSpliterator.OfRef<>(s, this); + } + } + + /** + * Concrete sub-types must also be an instance of type {@code T_CONS}. + * + * @param the type of the spined buffer. Must also be a type of + * {@code T_CONS}. + */ + static abstract class OfPrimitive< + T, + T_CONS, + T_BUFF extends ArrayBuffer.OfPrimitive, + T_SPLITR extends Spliterator.OfPrimitive> + extends UnorderedSliceSpliterator + implements Spliterator.OfPrimitive { + OfPrimitive(T_SPLITR s, long skip, long limit) { + super(s, skip, limit); + } + + OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(T_CONS action) { + while (permitStatus() != PermitStatus.NO_MORE) { + if (!s.tryAdvance((T_CONS) this)) + return false; + else if (acquirePermits(1) == 1) { + acceptConsumed(action); + return true; + } } - return s; + return false; + } + + protected abstract void acceptConsumed(T_CONS action); + + @Override + public void forEachRemaining(T_CONS action) { + T_BUFF sb = null; + PermitStatus permitStatus; + while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { + if (permitStatus == PermitStatus.MAYBE_MORE) { + // Optimistically traverse elements up to a threshold of CHUNK_SIZE + if (sb == null) + sb = bufferCreate(CHUNK_SIZE); + else + sb.reset(); + @SuppressWarnings("unchecked") + T_CONS sbc = (T_CONS) sb; + long permitsRequested = 0; + do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE); + if (permitsRequested == 0) + return; + sb.forEach(action, acquirePermits(permitsRequested)); + } + else { + // Must be UNLIMITED; let 'er rip + s.forEachRemaining(action); + return; + } + } + } + + protected abstract T_BUFF bufferCreate(int initialCapacity); + } + + static final class OfInt + extends OfPrimitive + implements Spliterator.OfInt, IntConsumer { + + int tmpValue; + + OfInt(Spliterator.OfInt s, long skip, long limit) { + super(s, skip, limit); + } + + OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) { + super(s, parent); + } + + @Override + public void accept(int value) { + tmpValue = value; + } + + @Override + protected void acceptConsumed(IntConsumer action) { + action.accept(tmpValue); + } + + @Override + protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) { + return new ArrayBuffer.OfInt(initialCapacity); + } + + @Override + protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { + return new UnorderedSliceSpliterator.OfInt(s, this); + } + } + + static final class OfLong + extends OfPrimitive + implements Spliterator.OfLong, LongConsumer { + + long tmpValue; + + OfLong(Spliterator.OfLong s, long skip, long limit) { + super(s, skip, limit); + } + + OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) { + super(s, parent); + } + + @Override + public void accept(long value) { + tmpValue = value; + } + + @Override + protected void acceptConsumed(LongConsumer action) { + action.accept(tmpValue); + } + + @Override + protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) { + return new ArrayBuffer.OfLong(initialCapacity); + } + + @Override + protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { + return new UnorderedSliceSpliterator.OfLong(s, this); + } + } + + static final class OfDouble + extends OfPrimitive + implements Spliterator.OfDouble, DoubleConsumer { + + double tmpValue; + + OfDouble(Spliterator.OfDouble s, long skip, long limit) { + super(s, skip, limit); + } + + OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) { + super(s, parent); + } + + @Override + public void accept(double value) { + tmpValue = value; + } + + @Override + protected void acceptConsumed(DoubleConsumer action) { + action.accept(tmpValue); + } + + @Override + protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) { + return new ArrayBuffer.OfDouble(initialCapacity); + } + + @Override + protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { + return new UnorderedSliceSpliterator.OfDouble(s, this); + } + } + } + + /** + * A Spliterator that infinitely supplies elements in no particular order. + * + *

Splitting divides the estimated size in two and stops when the + * estimate size is 0. + * + *

The {@code forEachRemaining} method if invoked will never terminate. + * The {@coe tryAdvance} method always returns true. + * + */ + static abstract class InfiniteSupplyingSpliterator implements Spliterator { + long estimate; + + protected InfiniteSupplyingSpliterator(long estimate) { + this.estimate = estimate; + } + + @Override + public long estimateSize() { + return estimate; + } + + @Override + public int characteristics() { + return IMMUTABLE; + } + + static final class OfRef extends InfiniteSupplyingSpliterator { + final Supplier s; + + OfRef(long size, Supplier s) { + super(size); + this.s = s; + } + + @Override + public boolean tryAdvance(Consumer action) { + action.accept(s.get()); + return true; + } + + @Override + public Spliterator trySplit() { + if (estimate == 0) + return null; + return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s); + } + } + + static final class OfInt extends InfiniteSupplyingSpliterator + implements Spliterator.OfInt { + final IntSupplier s; + + OfInt(long size, IntSupplier s) { + super(size); + this.s = s; + } + + @Override + public boolean tryAdvance(IntConsumer action) { + action.accept(s.getAsInt()); + return true; + } + + @Override + public Spliterator.OfInt trySplit() { + if (estimate == 0) + return null; + return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s); + } + } + + static final class OfLong extends InfiniteSupplyingSpliterator + implements Spliterator.OfLong { + final LongSupplier s; + + OfLong(long size, LongSupplier s) { + super(size); + this.s = s; + } + + @Override + public boolean tryAdvance(LongConsumer action) { + action.accept(s.getAsLong()); + return true; + } + + @Override + public Spliterator.OfLong trySplit() { + if (estimate == 0) + return null; + return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s); + } + } + + static final class OfDouble extends InfiniteSupplyingSpliterator + implements Spliterator.OfDouble { + final DoubleSupplier s; + + OfDouble(long size, DoubleSupplier s) { + super(size); + this.s = s; + } + + @Override + public boolean tryAdvance(DoubleConsumer action) { + action.accept(s.getAsDouble()); + return true; } @Override public Spliterator.OfDouble trySplit() { - return get().trySplit(); + if (estimate == 0) + return null; + return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s); + } + } + } + + // @@@ Consolidate with Node.Builder + static abstract class ArrayBuffer { + int index; + + void reset() { + index = 0; + } + + static final class OfRef extends ArrayBuffer implements Consumer { + final Object[] array; + + OfRef(int size) { + this.array = new Object[size]; + } + + @Override + public void accept(T t) { + array[index++] = t; + } + + public void forEach(Consumer action, long fence) { + for (int i = 0; i < fence; i++) { + @SuppressWarnings("unchecked") + T t = (T) array[i]; + action.accept(t); + } + } + } + + static abstract class OfPrimitive extends ArrayBuffer { + int index; + + @Override + void reset() { + index = 0; + } + + abstract void forEach(T_CONS action, long fence); + } + + static final class OfInt extends OfPrimitive + implements IntConsumer { + final int[] array; + + OfInt(int size) { + this.array = new int[size]; } @Override - public boolean tryAdvance(DoubleConsumer consumer) { - return get().tryAdvance(consumer); + public void accept(int t) { + array[index++] = t; + } + + @Override + public void forEach(IntConsumer action, long fence) { + for (int i = 0; i < fence; i++) { + action.accept(array[i]); + } + } + } + + static final class OfLong extends OfPrimitive + implements LongConsumer { + final long[] array; + + OfLong(int size) { + this.array = new long[size]; + } + + @Override + public void accept(long t) { + array[index++] = t; } @Override - public void forEachRemaining(DoubleConsumer consumer) { - get().forEachRemaining(consumer); + public void forEach(LongConsumer action, long fence) { + for (int i = 0; i < fence; i++) { + action.accept(array[i]); + } + } + } + + static final class OfDouble extends OfPrimitive + implements DoubleConsumer { + final double[] array; + + OfDouble(int size) { + this.array = new double[size]; + } + + @Override + public void accept(double t) { + array[index++] = t; + } + + @Override + void forEach(DoubleConsumer action, long fence) { + for (int i = 0; i < fence; i++) { + action.accept(array[i]); + } } } } -} +} \ No newline at end of file diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java --- a/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Fri Jun 28 10:29:21 2013 +0200 @@ -79,11 +79,11 @@ * test. * * @param actual the actual result - * @param excepted the expected result + * @param expected the expected result * @param isOrdered true if the pipeline is ordered * @param isParallel true if the pipeline is parallel */ - void assertResult(R actual, R excepted, boolean isOrdered, boolean isParallel); + void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel); } // Exercise stream operations diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/test/java/util/stream/bootlib/java/util/stream/SpliteratorTestHelper.java --- a/jdk/test/java/util/stream/bootlib/java/util/stream/SpliteratorTestHelper.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/test/java/util/stream/bootlib/java/util/stream/SpliteratorTestHelper.java Fri Jun 28 10:29:21 2013 +0200 @@ -42,11 +42,33 @@ */ public class SpliteratorTestHelper { + public interface ContentAsserter { + void assertContents(Collection actual, Collection expected, boolean isOrdered); + } + + private static ContentAsserter DEFAULT_CONTENT_ASSERTER + = SpliteratorTestHelper::assertContents; + + @SuppressWarnings("unchecked") + private static ContentAsserter defaultContentAsserter() { + return (ContentAsserter) DEFAULT_CONTENT_ASSERTER; + } + public static void testSpliterator(Supplier> supplier) { - testSpliterator(supplier, (Consumer b) -> b); + testSpliterator(supplier, defaultContentAsserter()); + } + + public static void testSpliterator(Supplier> supplier, + ContentAsserter asserter) { + testSpliterator(supplier, (Consumer b) -> b, asserter); } public static void testIntSpliterator(Supplier supplier) { + testIntSpliterator(supplier, defaultContentAsserter()); + } + + public static void testIntSpliterator(Supplier supplier, + ContentAsserter asserter) { class BoxingAdapter implements Consumer, IntConsumer { private final Consumer b; @@ -65,10 +87,15 @@ } } - testSpliterator(supplier, BoxingAdapter::new); + testSpliterator(supplier, BoxingAdapter::new, asserter); } public static void testLongSpliterator(Supplier supplier) { + testLongSpliterator(supplier, defaultContentAsserter()); + } + + public static void testLongSpliterator(Supplier supplier, + ContentAsserter asserter) { class BoxingAdapter implements Consumer, LongConsumer { private final Consumer b; @@ -87,10 +114,15 @@ } } - testSpliterator(supplier, BoxingAdapter::new); + testSpliterator(supplier, BoxingAdapter::new, asserter); } public static void testDoubleSpliterator(Supplier supplier) { + testDoubleSpliterator(supplier, defaultContentAsserter()); + } + + public static void testDoubleSpliterator(Supplier supplier, + ContentAsserter asserter) { class BoxingAdapter implements Consumer, DoubleConsumer { private final Consumer b; @@ -109,11 +141,12 @@ } } - testSpliterator(supplier, BoxingAdapter::new); + testSpliterator(supplier, BoxingAdapter::new, asserter); } static > void testSpliterator(Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { ArrayList fromForEach = new ArrayList<>(); Spliterator spliterator = supplier.get(); Consumer addToFromForEach = boxingAdapter.apply(fromForEach::add); @@ -121,14 +154,14 @@ Collection exp = Collections.unmodifiableList(fromForEach); - testForEach(exp, supplier, boxingAdapter); - testTryAdvance(exp, supplier, boxingAdapter); - testMixedTryAdvanceForEach(exp, supplier, boxingAdapter); - testMixedTraverseAndSplit(exp, supplier, boxingAdapter); + testForEach(exp, supplier, boxingAdapter, asserter); + testTryAdvance(exp, supplier, boxingAdapter, asserter); + testMixedTryAdvanceForEach(exp, supplier, boxingAdapter, asserter); + testMixedTraverseAndSplit(exp, supplier, boxingAdapter, asserter); testSplitAfterFullTraversal(supplier, boxingAdapter); - testSplitOnce(exp, supplier, boxingAdapter); - testSplitSixDeep(exp, supplier, boxingAdapter); - testSplitUntilNull(exp, supplier, boxingAdapter); + testSplitOnce(exp, supplier, boxingAdapter, asserter); + testSplitSixDeep(exp, supplier, boxingAdapter, asserter); + testSplitUntilNull(exp, supplier, boxingAdapter, asserter); } // @@ -136,7 +169,8 @@ private static > void testForEach( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -159,13 +193,14 @@ } assertEquals(fromForEach.size(), exp.size()); - assertContents(fromForEach, exp, isOrdered); + asserter.assertContents(fromForEach, exp, isOrdered); } private static > void testTryAdvance( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -188,13 +223,14 @@ } assertEquals(fromTryAdvance.size(), exp.size()); - assertContents(fromTryAdvance, exp, isOrdered); + asserter.assertContents(fromTryAdvance, exp, isOrdered); } private static > void testMixedTryAdvanceForEach( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -218,18 +254,14 @@ } assertEquals(dest.size(), exp.size()); - if (isOrdered) { - assertEquals(dest, exp); - } - else { - assertContentsUnordered(dest, exp); - } + asserter.assertContents(dest, exp, isOrdered); } private static > void testMixedTraverseAndSplit( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -266,12 +298,7 @@ } assertEquals(dest.size(), exp.size()); - if (isOrdered) { - assertEquals(dest, exp); - } - else { - assertContentsUnordered(dest, exp); - } + asserter.assertContents(dest, exp, isOrdered); } private static > void testSplitAfterFullTraversal( @@ -285,16 +312,14 @@ // Full traversal using forEach spliterator = supplier.get(); - spliterator.forEachRemaining(boxingAdapter.apply(e -> { - })); + spliterator.forEachRemaining(boxingAdapter.apply(e -> { })); split = spliterator.trySplit(); assertNull(split); // Full traversal using tryAdvance then forEach spliterator = supplier.get(); spliterator.tryAdvance(boxingAdapter.apply(e -> { })); - spliterator.forEachRemaining(boxingAdapter.apply(e -> { - })); + spliterator.forEachRemaining(boxingAdapter.apply(e -> { })); split = spliterator.trySplit(); assertNull(split); } @@ -302,7 +327,8 @@ private static > void testSplitOnce( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -322,13 +348,15 @@ if (s1Size >= 0 && s2Size >= 0) assertEquals(sizeIfKnown, s1Size + s2Size); } - assertContents(fromSplit, exp, isOrdered); + + asserter.assertContents(fromSplit, exp, isOrdered); } private static > void testSplitSixDeep( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { S spliterator = supplier.get(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); @@ -340,13 +368,13 @@ // verify splitting with forEach splitSixDeepVisitor(depth, 0, dest, spliterator, boxingAdapter, spliterator.characteristics(), false); - assertContents(dest, exp, isOrdered); + asserter.assertContents(dest, exp, isOrdered); // verify splitting with tryAdvance dest.clear(); spliterator = supplier.get(); splitSixDeepVisitor(depth, 0, dest, spliterator, boxingAdapter, spliterator.characteristics(), true); - assertContents(dest, exp, isOrdered); + asserter.assertContents(dest, exp, isOrdered); } } @@ -411,7 +439,8 @@ private static > void testSplitUntilNull( Collection exp, Supplier supplier, - UnaryOperator> boxingAdapter) { + UnaryOperator> boxingAdapter, + ContentAsserter asserter) { Spliterator s = supplier.get(); boolean isOrdered = s.hasCharacteristics(Spliterator.ORDERED); assertSpliterator(s); @@ -420,7 +449,7 @@ Consumer c = boxingAdapter.apply(splits::add); testSplitUntilNull(new SplitNode(c, s)); - assertContents(splits, exp, isOrdered); + asserter.assertContents(splits, exp, isOrdered); } private static class SplitNode { @@ -540,23 +569,10 @@ assertEquals(actual, expected); } else { - assertContentsUnordered(actual, expected); + LambdaTestHelpers.assertContentsUnordered(actual, expected); } } - private static void assertContentsUnordered(Iterable actual, Iterable expected) { - assertEquals(toBoxedMultiset(actual), toBoxedMultiset(expected)); - } - - private static Map toBoxedMultiset(Iterable c) { - Map result = new HashMap<>(); - c.forEach(e -> { - if (result.containsKey(e)) result.put(e, result.get(e) + 1); - else result.put(e, 1); - }); - return result; - } - static void mixedTraverseAndSplit(Consumer b, Spliterator splTop) { Spliterator spl1, spl2, spl3; splTop.tryAdvance(b); diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/test/java/util/stream/boottest/java/util/stream/SliceSpliteratorTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/stream/boottest/java/util/stream/SliceSpliteratorTest.java Fri Jun 28 10:29:21 2013 +0200 @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Spliterator; + +import static java.util.stream.Collectors.toList; +import static org.testng.Assert.assertEquals; + +/** + * @bug 8012987 + */ +@Test +public class SliceSpliteratorTest extends LoggingTestCase { + + static class UnorderedContentAsserter implements SpliteratorTestHelper.ContentAsserter { + Collection source; + + UnorderedContentAsserter(Collection source) { + this.source = source; + } + + @Override + public void assertContents(Collection actual, Collection expected, boolean isOrdered) { + if (isOrdered) { + assertEquals(actual, expected); + } + else { + assertEquals(actual.size(), expected.size()); + assertTrue(source.containsAll(actual)); + } + } + } + + interface SliceTester { + void test(int size, int skip, int limit); + } + + @DataProvider(name = "sliceSpliteratorDataProvider") + public static Object[][] sliceSpliteratorDataProvider() { + List data = new ArrayList<>(); + + // SIZED/SUBSIZED slice spliterator + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = IntStream.range(0, size).boxed().collect(toList()); + + SpliteratorTestHelper.testSpliterator(() -> { + Spliterator s = Arrays.spliterator(source.stream().toArray(Integer[]::new)); + + return new StreamSpliterators.SliceSpliterator.OfRef<>(s, skip, limit); + }); + }; + data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfRef", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = IntStream.range(0, size).boxed().collect(toList()); + + SpliteratorTestHelper.testIntSpliterator(() -> { + Spliterator.OfInt s = Arrays.spliterator(source.stream().mapToInt(i->i).toArray()); + + return new StreamSpliterators.SliceSpliterator.OfInt(s, skip, limit); + }); + }; + data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfInt", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = LongStream.range(0, size).boxed().collect(toList()); + + SpliteratorTestHelper.testLongSpliterator(() -> { + Spliterator.OfLong s = Arrays.spliterator(source.stream().mapToLong(i->i).toArray()); + + return new StreamSpliterators.SliceSpliterator.OfLong(s, skip, limit); + }); + }; + data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfLong", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = LongStream.range(0, size).asDoubleStream().boxed().collect(toList()); + + SpliteratorTestHelper.testDoubleSpliterator(() -> { + Spliterator.OfDouble s = Arrays.spliterator(source.stream().mapToDouble(i->i).toArray()); + + return new StreamSpliterators.SliceSpliterator.OfDouble(s, skip, limit); + }); + }; + data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfLong", r}); + } + + + // Unordered slice spliterator + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = IntStream.range(0, size).boxed().collect(toList()); + final UnorderedContentAsserter uca = new UnorderedContentAsserter<>(source); + + SpliteratorTestHelper.testSpliterator(() -> { + Spliterator s = Arrays.spliterator(source.stream().toArray(Integer[]::new)); + + return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); + }, uca); + }; + data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfRef", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = IntStream.range(0, size).boxed().collect(toList()); + final UnorderedContentAsserter uca = new UnorderedContentAsserter<>(source); + + SpliteratorTestHelper.testIntSpliterator(() -> { + Spliterator.OfInt s = Arrays.spliterator(source.stream().mapToInt(i->i).toArray()); + + return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); + }, uca); + }; + data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfInt", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = LongStream.range(0, size).boxed().collect(toList()); + final UnorderedContentAsserter uca = new UnorderedContentAsserter<>(source); + + SpliteratorTestHelper.testLongSpliterator(() -> { + Spliterator.OfLong s = Arrays.spliterator(source.stream().mapToLong(i->i).toArray()); + + return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); + }, uca); + }; + data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfLong", r}); + } + + { + SliceTester r = (size, skip, limit) -> { + final Collection source = LongStream.range(0, size).asDoubleStream().boxed().collect(toList()); + final UnorderedContentAsserter uca = new UnorderedContentAsserter<>(source); + + SpliteratorTestHelper.testDoubleSpliterator(() -> { + Spliterator.OfDouble s = Arrays.spliterator(LongStream.range(0, SIZE).asDoubleStream().toArray()); + + return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); + }, uca); + }; + data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfLong", r}); + } + + return data.toArray(new Object[0][]); + } + + static final int SIZE = 256; + + static final int STEP = 32; + + @Test(dataProvider = "sliceSpliteratorDataProvider") + public void testSliceSpliterator(String description, SliceTester r) { + setContext("size", SIZE); + for (int skip = 0; skip < SIZE; skip += STEP) { + setContext("skip", skip); + for (int limit = 0; limit < SIZE; limit += STEP) { + setContext("limit", skip); + r.test(SIZE, skip, limit); + } + } + } +} diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/test/java/util/stream/boottest/java/util/stream/StreamFlagsTest.java --- a/jdk/test/java/util/stream/boottest/java/util/stream/StreamFlagsTest.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/test/java/util/stream/boottest/java/util/stream/StreamFlagsTest.java Fri Jun 28 10:29:21 2013 +0200 @@ -80,8 +80,8 @@ EnumSet.of(ORDERED, DISTINCT, SIZED), EnumSet.of(SORTED, SHORT_CIRCUIT)); assertFlags(OpTestCase.getStreamFlags(repeat), - EnumSet.of(ORDERED), - EnumSet.of(SIZED, DISTINCT, SORTED, SHORT_CIRCUIT)); + EnumSet.noneOf(StreamOpFlag.class), + EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT)); } public void testFilter() { diff -r 8e3cb3c46ae8 -r 53b8b8c30086 jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java --- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Tue Jun 11 13:41:38 2013 -0700 +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Fri Jun 28 10:29:21 2013 +0200 @@ -22,45 +22,440 @@ */ package org.openjdk.tests.java.util.stream; -import java.util.stream.OpTestCase; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.Arrays; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import java.util.Spliterator; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.DoubleStream; +import java.util.stream.DoubleStreamTestScenario; +import java.util.stream.IntStream; +import java.util.stream.IntStreamTestScenario; +import java.util.stream.LambdaTestHelpers; +import java.util.stream.LongStream; +import java.util.stream.LongStreamTestScenario; +import java.util.stream.OpTestCase; import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import java.util.stream.StreamTestScenario; +import java.util.stream.TestData; -import static java.util.stream.LambdaTestHelpers.assertContents; +import static java.util.stream.LambdaTestHelpers.assertUnique; @Test public class InfiniteStreamWithLimitOpTest extends OpTestCase { - private static final List tenAs = Arrays.asList("A", "A", "A", "A", "A", "A", "A", "A", "A", "A"); + private static final long SKIP_LIMIT_SIZE = 1 << 16; + + @DataProvider(name = "Stream.limit") + @SuppressWarnings("rawtypes") + public static Object[][] sliceFunctionsDataProvider() { + Function f = s -> String.format(s, SKIP_LIMIT_SIZE); + + List data = new ArrayList<>(); + + data.add(new Object[]{f.apply("Stream.limit(%d)"), + (UnaryOperator) s -> s.limit(SKIP_LIMIT_SIZE)}); + data.add(new Object[]{f.apply("Stream.substream(%d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)}); + data.add(new Object[]{f.apply("Stream.substream(%1$d).limit(%1$d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)}); + + return data.toArray(new Object[0][]); + } + + @DataProvider(name = "IntStream.limit") + public static Object[][] intSliceFunctionsDataProvider() { + Function f = s -> String.format(s, SKIP_LIMIT_SIZE); + + List data = new ArrayList<>(); + + data.add(new Object[]{f.apply("IntStream.limit(%d)"), + (UnaryOperator) s -> s.limit(SKIP_LIMIT_SIZE)}); + data.add(new Object[]{f.apply("IntStream.substream(%d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)}); + data.add(new Object[]{f.apply("IntStream.substream(%1$d).limit(%1$d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)}); + + return data.toArray(new Object[0][]); + } + + @DataProvider(name = "LongStream.limit") + public static Object[][] longSliceFunctionsDataProvider() { + Function f = s -> String.format(s, SKIP_LIMIT_SIZE); + + List data = new ArrayList<>(); + + data.add(new Object[]{f.apply("LongStream.limit(%d)"), + (UnaryOperator) s -> s.limit(SKIP_LIMIT_SIZE)}); + data.add(new Object[]{f.apply("LongStream.substream(%d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)}); + data.add(new Object[]{f.apply("LongStream.substream(%1$d).limit(%1$d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)}); - public void testRepeatLimit() { - assertContents(Stream.generate(() -> "A").limit(10).iterator(), tenAs.iterator()); + return data.toArray(new Object[0][]); + } + + @DataProvider(name = "DoubleStream.limit") + public static Object[][] doubleSliceFunctionsDataProvider() { + Function f = s -> String.format(s, SKIP_LIMIT_SIZE); + + List data = new ArrayList<>(); + + data.add(new Object[]{f.apply("DoubleStream.limit(%d)"), + (UnaryOperator) s -> s.limit(SKIP_LIMIT_SIZE)}); + data.add(new Object[]{f.apply("DoubleStream.substream(%d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)}); + data.add(new Object[]{f.apply("DoubleStream.substream(%1$d).limit(%1$d)"), + (UnaryOperator) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)}); + + return data.toArray(new Object[0][]); + } + + private ResultAsserter> unorderedAsserter() { + return (act, exp, ord, par) -> { + if (par & !ord) { + // Can only assert that all elements of the actual result + // are distinct and that the count is the limit size + // any element within the range [0, Long.MAX_VALUE) may be + // present + assertUnique(act); + long count = 0; + for (T l : act) { + count++; + } + assertEquals(count, SKIP_LIMIT_SIZE, "size not equal"); + } + else { + LambdaTestHelpers.assertContents(act, exp); + } + }; + } + + private TestData.OfRef refLongs() { + return refLongRange(0, Long.MAX_VALUE); + } + + private TestData.OfRef refLongRange(long l, long u) { + return TestData.Factory.ofSupplier( + String.format("[%d, %d)", l, u), + () -> LongStream.range(l, u).boxed()); } - public void testIterateLimit() { - assertContents(Stream.iterate("A", s -> s).limit(10).iterator(), tenAs.iterator()); + private TestData.OfInt ints() { + return intRange(0, Integer.MAX_VALUE); + } + + private TestData.OfInt intRange(int l, int u) { + return TestData.Factory.ofIntSupplier( + String.format("[%d, %d)", l, u), + () -> IntStream.range(l, u)); + } + + private TestData.OfLong longs() { + return longRange(0, Long.MAX_VALUE); + } + + private TestData.OfLong longRange(long l, long u) { + return TestData.Factory.ofLongSupplier( + String.format("[%d, %d)", l, u), + () -> LongStream.range(l, u)); + } + + private TestData.OfDouble doubles() { + return doubleRange(0, 1L << 53); + } + + private TestData.OfDouble doubleRange(long l, long u) { + return TestData.Factory.ofDoubleSupplier( + String.format("[%d, %d)", l, u), + () -> LongStream.range(l, u).mapToDouble(i -> (double) i)); + } + + + // Sized/subsized range + + @Test(dataProvider = "Stream.limit") + public void testSubsizedWithRange(String description, UnaryOperator> fs) { + // Range is [0, Long.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(refLongs()). + stream(s -> fs.apply(s)). + without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + exercise(); + } + + @Test(dataProvider = "IntStream.limit") + public void testIntSubsizedWithRange(String description, UnaryOperator fs) { + // Range is [0, Integer.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(ints()). + stream(s -> fs.apply(s)). + without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + exercise(); + } + + @Test(dataProvider = "LongStream.limit") + public void testLongSubsizedWithRange(String description, UnaryOperator fs) { + // Range is [0, Long.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(longs()). + stream(s -> fs.apply(s)). + without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + exercise(); + } + + @Test(dataProvider = "DoubleStream.limit") + public void testDoubleSubsizedWithRange(String description, UnaryOperator fs) { + // Range is [0, 2^53), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(doubles()). + stream(s -> fs.apply(s)). + without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + exercise(); + } + + + // Unordered finite not SIZED/SUBSIZED + + @Test(dataProvider = "Stream.limit") + public void testUnorderedFinite(String description, UnaryOperator> fs) { + // Range is [0, Long.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(longs()). + stream(s -> fs.apply(s.filter(i -> true).unordered().boxed())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "IntStream.limit") + public void testIntUnorderedFinite(String description, UnaryOperator fs) { + // Range is [0, Integer.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(ints()). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + resultAsserter(unorderedAsserter()). + exercise(); } - public void testIterateFibLimit() { - Stream fib = Stream.iterate(new int[] {0, 1}, pair -> new int[] {pair[1], pair[0] + pair[1]}) - .map(pair -> pair[0]); + @Test(dataProvider = "LongStream.limit") + public void testLongUnorderedFinite(String description, UnaryOperator fs) { + // Range is [0, Long.MAX_VALUE), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(longs()). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "DoubleStream.limit") + public void testDoubleUnorderedFinite(String description, UnaryOperator fs) { + // Range is [0, 1L << 53), splits are SUBSIZED + // Such a size will induce out of memory errors for incorrect + // slice implementations + // Upper bound ensures values mapped to doubles will be unique + withData(doubles()). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + + // Unordered finite not SUBSIZED - assertContents( - fib.limit(10).iterator(), - Arrays.asList(0, 1, 1, 2, 3, 5, 8, 13, 21, 34).iterator()); + @SuppressWarnings({"rawtypes", "unchecked"}) + private Spliterator.OfLong proxyNotSubsized(Spliterator.OfLong s) { + InvocationHandler ih = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + switch (method.getName()) { + case "characteristics": { + int c = (Integer) method.invoke(s, args); + return c & ~Spliterator.SUBSIZED; + } + case "hasCharacteristics": { + int c = (Integer) args[0]; + boolean b = (Boolean) method.invoke(s, args); + return b & ((c & Spliterator.SUBSIZED) == 0); + } + default: + return method.invoke(s, args); + } + } + }; + + return (Spliterator.OfLong) Proxy.newProxyInstance(this.getClass().getClassLoader(), + new Class[]{Spliterator.OfLong.class}, + ih); + } + + private TestData.OfLong proxiedLongRange(long l, long u) { + return TestData.Factory.ofLongSupplier( + String.format("[%d, %d)", l, u), + () -> StreamSupport.longStream(proxyNotSubsized(LongStream.range(l, u).spliterator()))); + } + + @Test(dataProvider = "Stream.limit") + public void testUnorderedSizedNotSubsizedFinite(String description, UnaryOperator> fs) { + // Range is [0, Long.MAX_VALUE), splits are not SUBSIZED (proxy clears + // the SUBSIZED characteristic) + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(proxiedLongRange(0, Long.MAX_VALUE)). + stream(s -> fs.apply(s.unordered().boxed())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "IntStream.limit") + public void testIntUnorderedSizedNotSubsizedFinite(String description, UnaryOperator fs) { + // Range is [0, Integer.MAX_VALUE), splits are not SUBSIZED (proxy clears + // the SUBSIZED characteristic) + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(proxiedLongRange(0, Integer.MAX_VALUE)). + stream(s -> fs.apply(s.unordered().mapToInt(i -> (int) i))). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "LongStream.limit") + public void testLongUnorderedSizedNotSubsizedFinite(String description, UnaryOperator fs) { + // Range is [0, Long.MAX_VALUE), splits are not SUBSIZED (proxy clears + // the SUBSIZED characteristic) + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(proxiedLongRange(0, Long.MAX_VALUE)). + stream(s -> fs.apply(s.unordered())). + resultAsserter(unorderedAsserter()). + exercise(); } - public void testInfiniteWithLimitToShortCircuitTerminal() { - Object[] array = Stream.generate(() -> 1).limit(4).toArray(); - assertEquals(4, array.length); - array = Stream.generate(() -> 1).limit(4).filter(i -> true).toArray(); - assertEquals(4, array.length); - List result = Stream.generate(() -> 1).limit(4).collect(Collectors.toList()); - assertEquals(result, Arrays.asList(1, 1, 1, 1)); + @Test(dataProvider = "DoubleStream.limit") + public void testDoubleUnorderedSizedNotSubsizedFinite(String description, UnaryOperator fs) { + // Range is [0, Double.MAX_VALUE), splits are not SUBSIZED (proxy clears + // the SUBSIZED characteristic) + // Such a size will induce out of memory errors for incorrect + // slice implementations + withData(proxiedLongRange(0, 1L << 53)). + stream(s -> fs.apply(s.unordered().mapToDouble(i -> (double) i))). + resultAsserter(unorderedAsserter()). + exercise(); + } + + + // Unordered generation + + @Test(dataProvider = "Stream.limit") + public void testUnorderedGenerator(String description, UnaryOperator> fs) { + // Source is spliterator of infinite size + TestData.OfRef generator = TestData.Factory.ofSupplier( + "[1L, 1L, ...]", () -> Stream.generate(() -> 1L)); + + withData(generator). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + exercise(); + } + + @Test(dataProvider = "IntStream.limit") + public void testIntUnorderedGenerator(String description, UnaryOperator fs) { + // Source is spliterator of infinite size + TestData.OfInt generator = TestData.Factory.ofIntSupplier( + "[1, 1, ...]", () -> IntStream.generate(() -> 1)); + + withData(generator). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + exercise(); + } + + @Test(dataProvider = "LongStream.limit") + public void testLongUnorderedGenerator(String description, UnaryOperator fs) { + // Source is spliterator of infinite size + TestData.OfLong generator = TestData.Factory.ofLongSupplier( + "[1L, 1L, ...]", () -> LongStream.generate(() -> 1)); + + withData(generator). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + exercise(); + } + + @Test(dataProvider = "DoubleStream.limit") + public void testDoubleUnorderedGenerator(String description, UnaryOperator fs) { + // Source is spliterator of infinite size + TestData.OfDouble generator = TestData.Factory.ofDoubleSupplier( + "[1.0, 1.0, ...]", () -> DoubleStream.generate(() -> 1.0)); + + withData(generator). + stream(s -> fs.apply(s.filter(i -> true).unordered())). + exercise(); + } + + + // Unordered iteration + + @Test(dataProvider = "Stream.limit") + public void testUnorderedIteration(String description, UnaryOperator> fs) { + // Source is a right-balanced tree of infinite size + TestData.OfRef iterator = TestData.Factory.ofSupplier( + "[1L, 2L, 3L, ...]", () -> Stream.iterate(1L, i -> i + 1L)); + + // Ref + withData(iterator). + stream(s -> fs.apply(s.unordered())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "IntStream.limit") + public void testIntUnorderedIteration(String description, UnaryOperator fs) { + // Source is a right-balanced tree of infinite size + TestData.OfInt iterator = TestData.Factory.ofIntSupplier( + "[1, 2, 3, ...]", () -> IntStream.iterate(1, i -> i + 1)); + + // Ref + withData(iterator). + stream(s -> fs.apply(s.unordered())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "LongStream.limit") + public void testLongUnorderedIteration(String description, UnaryOperator fs) { + // Source is a right-balanced tree of infinite size + TestData.OfLong iterator = TestData.Factory.ofLongSupplier( + "[1L, 2L, 3L, ...]", () -> LongStream.iterate(1, i -> i + 1)); + + // Ref + withData(iterator). + stream(s -> fs.apply(s.unordered())). + resultAsserter(unorderedAsserter()). + exercise(); + } + + @Test(dataProvider = "DoubleStream.limit") + public void testDoubleUnorderedIteration(String description, UnaryOperator fs) { + // Source is a right-balanced tree of infinite size + TestData.OfDouble iterator = TestData.Factory.ofDoubleSupplier( + "[1.0, 2.0, 3.0, ...]", () -> DoubleStream.iterate(1, i -> i + 1)); + + // Ref + withData(iterator). + stream(s -> fs.apply(s.unordered())). + resultAsserter(unorderedAsserter()). + exercise(); } }