src/java.base/share/classes/java/util/stream/ReferencePipeline.java
changeset 48593 fca88bbbafb9
parent 47216 71c04702a3d5
--- a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -253,12 +253,14 @@
     @Override
     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                 return new Sink.ChainedReference<P_OUT, R>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -267,11 +269,27 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (Stream<? extends R> result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstream);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstream);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -280,13 +298,17 @@
     @Override
     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     IntConsumer downstreamAsInt = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -295,11 +317,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (IntStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsInt);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsInt);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -308,13 +342,17 @@
     @Override
     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     DoubleConsumer downstreamAsDouble = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -323,11 +361,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (DoubleStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsDouble);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsDouble);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -342,7 +392,12 @@
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -351,11 +406,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (LongStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsLong);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };