251 } |
251 } |
252 |
252 |
253 @Override |
253 @Override |
254 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { |
254 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { |
255 Objects.requireNonNull(mapper); |
255 Objects.requireNonNull(mapper); |
256 // We can do better than this, by polling cancellationRequested when stream is infinite |
|
257 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, |
256 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, |
258 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
259 @Override |
258 @Override |
260 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { |
259 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { |
261 return new Sink.ChainedReference<P_OUT, R>(sink) { |
260 return new Sink.ChainedReference<P_OUT, R>(sink) { |
|
261 // true if cancellationRequested() has been called |
|
262 boolean cancellationRequestedCalled; |
|
263 |
262 @Override |
264 @Override |
263 public void begin(long size) { |
265 public void begin(long size) { |
264 downstream.begin(-1); |
266 downstream.begin(-1); |
265 } |
267 } |
266 |
268 |
267 @Override |
269 @Override |
268 public void accept(P_OUT u) { |
270 public void accept(P_OUT u) { |
269 try (Stream<? extends R> result = mapper.apply(u)) { |
271 try (Stream<? extends R> result = mapper.apply(u)) { |
270 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it |
272 if (result != null) { |
271 if (result != null) |
273 if (!cancellationRequestedCalled) { |
272 result.sequential().forEach(downstream); |
274 result.sequential().forEach(downstream); |
|
275 } |
|
276 else { |
|
277 var s = result.sequential().spliterator(); |
|
278 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream)); |
|
279 } |
|
280 } |
273 } |
281 } |
|
282 } |
|
283 |
|
284 @Override |
|
285 public boolean cancellationRequested() { |
|
286 // If this method is called then an operation within the stream |
|
287 // pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
288 // Note that we cannot differentiate between an upstream or |
|
289 // downstream operation |
|
290 cancellationRequestedCalled = true; |
|
291 return downstream.cancellationRequested(); |
274 } |
292 } |
275 }; |
293 }; |
276 } |
294 } |
277 }; |
295 }; |
278 } |
296 } |
279 |
297 |
280 @Override |
298 @Override |
281 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { |
299 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { |
282 Objects.requireNonNull(mapper); |
300 Objects.requireNonNull(mapper); |
283 // We can do better than this, by polling cancellationRequested when stream is infinite |
|
284 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
301 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
285 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
302 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
286 @Override |
303 @Override |
287 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { |
304 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { |
288 return new Sink.ChainedReference<P_OUT, Integer>(sink) { |
305 return new Sink.ChainedReference<P_OUT, Integer>(sink) { |
|
306 // true if cancellationRequested() has been called |
|
307 boolean cancellationRequestedCalled; |
|
308 |
|
309 // cache the consumer to avoid creation on every accepted element |
289 IntConsumer downstreamAsInt = downstream::accept; |
310 IntConsumer downstreamAsInt = downstream::accept; |
|
311 |
290 @Override |
312 @Override |
291 public void begin(long size) { |
313 public void begin(long size) { |
292 downstream.begin(-1); |
314 downstream.begin(-1); |
293 } |
315 } |
294 |
316 |
295 @Override |
317 @Override |
296 public void accept(P_OUT u) { |
318 public void accept(P_OUT u) { |
297 try (IntStream result = mapper.apply(u)) { |
319 try (IntStream result = mapper.apply(u)) { |
298 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it |
320 if (result != null) { |
299 if (result != null) |
321 if (!cancellationRequestedCalled) { |
300 result.sequential().forEach(downstreamAsInt); |
322 result.sequential().forEach(downstreamAsInt); |
|
323 } |
|
324 else { |
|
325 var s = result.sequential().spliterator(); |
|
326 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); |
|
327 } |
|
328 } |
301 } |
329 } |
|
330 } |
|
331 |
|
332 @Override |
|
333 public boolean cancellationRequested() { |
|
334 cancellationRequestedCalled = true; |
|
335 return downstream.cancellationRequested(); |
302 } |
336 } |
303 }; |
337 }; |
304 } |
338 } |
305 }; |
339 }; |
306 } |
340 } |
307 |
341 |
308 @Override |
342 @Override |
309 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { |
343 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { |
310 Objects.requireNonNull(mapper); |
344 Objects.requireNonNull(mapper); |
311 // We can do better than this, by polling cancellationRequested when stream is infinite |
|
312 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
345 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
313 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
346 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
314 @Override |
347 @Override |
315 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { |
348 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { |
316 return new Sink.ChainedReference<P_OUT, Double>(sink) { |
349 return new Sink.ChainedReference<P_OUT, Double>(sink) { |
|
350 // true if cancellationRequested() has been called |
|
351 boolean cancellationRequestedCalled; |
|
352 |
|
353 // cache the consumer to avoid creation on every accepted element |
317 DoubleConsumer downstreamAsDouble = downstream::accept; |
354 DoubleConsumer downstreamAsDouble = downstream::accept; |
|
355 |
318 @Override |
356 @Override |
319 public void begin(long size) { |
357 public void begin(long size) { |
320 downstream.begin(-1); |
358 downstream.begin(-1); |
321 } |
359 } |
322 |
360 |
323 @Override |
361 @Override |
324 public void accept(P_OUT u) { |
362 public void accept(P_OUT u) { |
325 try (DoubleStream result = mapper.apply(u)) { |
363 try (DoubleStream result = mapper.apply(u)) { |
326 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it |
364 if (result != null) { |
327 if (result != null) |
365 if (!cancellationRequestedCalled) { |
328 result.sequential().forEach(downstreamAsDouble); |
366 result.sequential().forEach(downstreamAsDouble); |
|
367 } |
|
368 else { |
|
369 var s = result.sequential().spliterator(); |
|
370 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); |
|
371 } |
|
372 } |
329 } |
373 } |
|
374 } |
|
375 |
|
376 @Override |
|
377 public boolean cancellationRequested() { |
|
378 cancellationRequestedCalled = true; |
|
379 return downstream.cancellationRequested(); |
330 } |
380 } |
331 }; |
381 }; |
332 } |
382 } |
333 }; |
383 }; |
334 } |
384 } |
340 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
390 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, |
341 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
391 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
342 @Override |
392 @Override |
343 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { |
393 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { |
344 return new Sink.ChainedReference<P_OUT, Long>(sink) { |
394 return new Sink.ChainedReference<P_OUT, Long>(sink) { |
|
395 // true if cancellationRequested() has been called |
|
396 boolean cancellationRequestedCalled; |
|
397 |
|
398 // cache the consumer to avoid creation on every accepted element |
345 LongConsumer downstreamAsLong = downstream::accept; |
399 LongConsumer downstreamAsLong = downstream::accept; |
|
400 |
346 @Override |
401 @Override |
347 public void begin(long size) { |
402 public void begin(long size) { |
348 downstream.begin(-1); |
403 downstream.begin(-1); |
349 } |
404 } |
350 |
405 |
351 @Override |
406 @Override |
352 public void accept(P_OUT u) { |
407 public void accept(P_OUT u) { |
353 try (LongStream result = mapper.apply(u)) { |
408 try (LongStream result = mapper.apply(u)) { |
354 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it |
409 if (result != null) { |
355 if (result != null) |
410 if (!cancellationRequestedCalled) { |
356 result.sequential().forEach(downstreamAsLong); |
411 result.sequential().forEach(downstreamAsLong); |
|
412 } |
|
413 else { |
|
414 var s = result.sequential().spliterator(); |
|
415 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); |
|
416 } |
|
417 } |
357 } |
418 } |
|
419 } |
|
420 |
|
421 @Override |
|
422 public boolean cancellationRequested() { |
|
423 cancellationRequestedCalled = true; |
|
424 return downstream.cancellationRequested(); |
358 } |
425 } |
359 }; |
426 }; |
360 } |
427 } |
361 }; |
428 }; |
362 } |
429 } |