src/java.base/share/classes/java/util/stream/LongPipeline.java
changeset 48593 fca88bbbafb9
parent 47216 71c04702a3d5
--- a/src/java.base/share/classes/java/util/stream/LongPipeline.java	Wed Jan 17 17:33:48 2018 +0000
+++ b/src/java.base/share/classes/java/util/stream/LongPipeline.java	Thu Dec 21 13:52:20 2017 -0800
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -280,6 +280,12 @@
             @Override
             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedLong<Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -288,11 +294,27 @@
                     @Override
                     public void accept(long t) {
                         try (LongStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    var s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };