1 /* |
1 /* |
2 * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
296 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
296 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
297 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
297 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
298 @Override |
298 @Override |
299 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
299 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
300 return new Sink.ChainedInt<Integer>(sink) { |
300 return new Sink.ChainedInt<Integer>(sink) { |
|
301 // true if cancellationRequested() has been called |
|
302 boolean cancellationRequestedCalled; |
|
303 |
|
304 // cache the consumer to avoid creation on every accepted element |
|
305 IntConsumer downstreamAsInt = downstream::accept; |
|
306 |
301 @Override |
307 @Override |
302 public void begin(long size) { |
308 public void begin(long size) { |
303 downstream.begin(-1); |
309 downstream.begin(-1); |
304 } |
310 } |
305 |
311 |
306 @Override |
312 @Override |
307 public void accept(int t) { |
313 public void accept(int t) { |
308 try (IntStream result = mapper.apply(t)) { |
314 try (IntStream result = mapper.apply(t)) { |
309 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it |
315 if (result != null) { |
310 if (result != null) |
316 if (!cancellationRequestedCalled) { |
311 result.sequential().forEach(i -> downstream.accept(i)); |
317 result.sequential().forEach(downstreamAsInt); |
|
318 } |
|
319 else { |
|
320 var s = result.sequential().spliterator(); |
|
321 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); |
|
322 } |
|
323 } |
312 } |
324 } |
|
325 } |
|
326 |
|
327 @Override |
|
328 public boolean cancellationRequested() { |
|
329 // If this method is called then an operation within the stream |
|
330 // pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
331 // Note that we cannot differentiate between an upstream or |
|
332 // downstream operation |
|
333 cancellationRequestedCalled = true; |
|
334 return downstream.cancellationRequested(); |
313 } |
335 } |
314 }; |
336 }; |
315 } |
337 } |
316 }; |
338 }; |
317 } |
339 } |