8075939: Stream.flatMap() causes breaking of short-circuiting of terminal operations
authorpsandoz
Thu, 21 Dec 2017 13:52:20 -0800
changeset 48593 fca88bbbafb9
parent 48592 ef70df777355
child 48594 4e4929530412
8075939: Stream.flatMap() causes breaking of short-circuiting of terminal operations Reviewed-by: forax, smarks
src/java.base/share/classes/java/util/stream/DoublePipeline.java
src/java.base/share/classes/java/util/stream/IntPipeline.java
src/java.base/share/classes/java/util/stream/LongPipeline.java
src/java.base/share/classes/java/util/stream/ReferencePipeline.java
src/java.base/share/classes/java/util/stream/SortedOps.java
test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java
--- a/src/java.base/share/classes/java/util/stream/DoublePipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/DoublePipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -40,7 +40,6 @@
 import java.util.function.DoubleToLongFunction;
 import java.util.function.DoubleUnaryOperator;
 import java.util.function.IntFunction;
-import java.util.function.LongPredicate;
 import java.util.function.ObjDoubleConsumer;
 import java.util.function.Supplier;
 
@@ -265,6 +264,12 @@
             @Override
             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
                 return new Sink.ChainedDouble<Double>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    DoubleConsumer downstreamAsDouble = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -273,11 +278,27 @@
                     @Override
                     public void accept(double t) {
                         try (DoubleStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsDouble);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/java.base/share/classes/java/util/stream/IntPipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/IntPipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -298,6 +298,12 @@
             @Override
             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
                 return new Sink.ChainedInt<Integer>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    IntConsumer downstreamAsInt = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -306,11 +312,27 @@
                     @Override
                     public void accept(int t) {
                         try (IntStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsInt);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/java.base/share/classes/java/util/stream/LongPipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/LongPipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -280,6 +280,12 @@
             @Override
             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedLong<Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -288,11 +294,27 @@
                     @Override
                     public void accept(long t) {
                         try (LongStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -253,12 +253,14 @@
     @Override
     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                 return new Sink.ChainedReference<P_OUT, R>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -267,11 +269,27 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (Stream<? extends R> result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstream);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstream);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -280,13 +298,17 @@
     @Override
     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     IntConsumer downstreamAsInt = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -295,11 +317,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (IntStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsInt);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsInt);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -308,13 +342,17 @@
     @Override
     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     DoubleConsumer downstreamAsDouble = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -323,11 +361,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (DoubleStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsDouble);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsDouble);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -342,7 +392,12 @@
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -351,11 +406,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (LongStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsLong);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/java.base/share/classes/java/util/stream/SortedOps.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/SortedOps.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -304,7 +304,8 @@
     private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
         protected final Comparator<? super T> comparator;
         // @@@ could be a lazy final value, if/when support is added
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
             super(downstream);
@@ -319,7 +320,11 @@
          */
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            // If this method is called then an operation within the stream
+            // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+            // Note that we cannot differentiate between an upstream or
+            // downstream operation
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -347,7 +352,7 @@
         public void end() {
             Arrays.sort(array, 0, offset, comparator);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -386,7 +391,7 @@
         public void end() {
             list.sort(comparator);
             downstream.begin(list.size());
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 list.forEach(downstream::accept);
             }
             else {
@@ -409,7 +414,8 @@
      * Abstract {@link Sink} for implementing sort on int streams.
      */
     private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractIntSortingSink(Sink<? super Integer> downstream) {
             super(downstream);
@@ -417,7 +423,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -444,7 +450,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -484,7 +490,7 @@
             int[] ints = b.asPrimitiveArray();
             Arrays.sort(ints);
             downstream.begin(ints.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int anInt : ints)
                     downstream.accept(anInt);
             }
@@ -507,7 +513,8 @@
      * Abstract {@link Sink} for implementing sort on long streams.
      */
     private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractLongSortingSink(Sink<? super Long> downstream) {
             super(downstream);
@@ -515,7 +522,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -542,7 +549,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -582,7 +589,7 @@
             long[] longs = b.asPrimitiveArray();
             Arrays.sort(longs);
             downstream.begin(longs.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (long aLong : longs)
                     downstream.accept(aLong);
             }
@@ -605,7 +612,8 @@
      * Abstract {@link Sink} for implementing sort on long streams.
      */
     private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
             super(downstream);
@@ -613,7 +621,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -640,7 +648,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -680,7 +688,7 @@
             double[] doubles = b.asPrimitiveArray();
             Arrays.sort(doubles);
             downstream.begin(doubles.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (double aDouble : doubles)
                     downstream.accept(aDouble);
             }
--- a/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -24,7 +24,7 @@
 /*
  * @test
  * @summary flat-map operations
- * @bug 8044047 8076458
+ * @bug 8044047 8076458 8075939
  */
 
 package org.openjdk.tests.java.util.stream;
@@ -54,6 +54,7 @@
 @Test
 public class FlatMapOpTest extends OpTestCase {
 
+    @Test
     public void testNullMapper() {
         checkNPE(() -> Stream.of(1).flatMap(null));
         checkNPE(() -> IntStream.of(1).flatMap(null));
@@ -64,6 +65,7 @@
     static final Function<Integer, Stream<Integer>> integerRangeMapper
             = e -> IntStream.range(0, e).boxed();
 
+    @Test
     public void testFlatMap() {
         String[] stringsArray = {"hello", "there", "", "yada"};
         Stream<String> strings = Arrays.asList(stringsArray).stream();
@@ -149,11 +151,24 @@
         exerciseOps(data, s -> s.flatMap((Integer e) -> IntStream.range(0, e).boxed().limit(10)));
     }
 
+    @Test
+    public void testOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        Stream.of(0).flatMap(i -> IntStream.range(0, 100).boxed()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
+
     //
 
     @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
     public void testIntOps(String name, TestData.OfInt data) {
-        Collection<Integer> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToInt(j -> j)));
+        Collection<Integer> result = exerciseOps(data, s -> s.flatMap(IntStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToInt(IntStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -165,13 +180,35 @@
     public void testIntOpsX(String name, TestData.OfInt data) {
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e)));
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e).limit(10)));
+
+        exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e)));
+        exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e).limit(10)));
+    }
+
+    @Test
+    public void testIntOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        IntStream.of(0).flatMap(i -> IntStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToInt(i -> IntStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
     }
 
     //
 
     @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
     public void testLongOps(String name, TestData.OfLong data) {
-        Collection<Long> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToLong(j -> j)));
+        Collection<Long> result = exerciseOps(data, s -> s.flatMap(LongStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToLong(LongStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -185,11 +222,30 @@
         exerciseOps(data, s -> s.flatMap(e -> LongStream.range(0, e).limit(10)));
     }
 
+    @Test
+    public void testLongOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        LongStream.of(0).flatMap(i -> LongStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToLong(i -> LongStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
+
     //
 
     @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
     public void testDoubleOps(String name, TestData.OfDouble data) {
-        Collection<Double> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToDouble(j -> j)));
+        Collection<Double> result = exerciseOps(data, s -> s.flatMap(DoubleStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToDouble(DoubleStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -202,4 +258,19 @@
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).asDoubleStream()));
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).limit(10).asDoubleStream()));
     }
+
+    @Test
+    public void testDoubleOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        DoubleStream.of(0).flatMap(i -> IntStream.range(0, 100).asDoubleStream()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToDouble(i -> IntStream.range(0, 100).asDoubleStream()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
 }