equal
deleted
inserted
replaced
1 /* |
1 /* |
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
302 * parallel pipelines. |
302 * parallel pipelines. |
303 */ |
303 */ |
304 private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { |
304 private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { |
305 protected final Comparator<? super T> comparator; |
305 protected final Comparator<? super T> comparator; |
306 // @@@ could be a lazy final value, if/when support is added |
306 // @@@ could be a lazy final value, if/when support is added |
307 protected boolean cancellationWasRequested; |
307 // true if cancellationRequested() has been called |
|
308 protected boolean cancellationRequestedCalled; |
308 |
309 |
309 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { |
310 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { |
310 super(downstream); |
311 super(downstream); |
311 this.comparator = comparator; |
312 this.comparator = comparator; |
312 } |
313 } |
317 * |
318 * |
318 * @return false, as this sink never short-circuits. |
319 * @return false, as this sink never short-circuits. |
319 */ |
320 */ |
320 @Override |
321 @Override |
321 public final boolean cancellationRequested() { |
322 public final boolean cancellationRequested() { |
322 cancellationWasRequested = true; |
323 // If this method is called then an operation within the stream |
|
324 // pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
325 // Note that we cannot differentiate between an upstream or |
|
326 // downstream operation |
|
327 cancellationRequestedCalled = true; |
323 return false; |
328 return false; |
324 } |
329 } |
325 } |
330 } |
326 |
331 |
327 /** |
332 /** |
345 |
350 |
346 @Override |
351 @Override |
347 public void end() { |
352 public void end() { |
348 Arrays.sort(array, 0, offset, comparator); |
353 Arrays.sort(array, 0, offset, comparator); |
349 downstream.begin(offset); |
354 downstream.begin(offset); |
350 if (!cancellationWasRequested) { |
355 if (!cancellationRequestedCalled) { |
351 for (int i = 0; i < offset; i++) |
356 for (int i = 0; i < offset; i++) |
352 downstream.accept(array[i]); |
357 downstream.accept(array[i]); |
353 } |
358 } |
354 else { |
359 else { |
355 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
360 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
384 |
389 |
385 @Override |
390 @Override |
386 public void end() { |
391 public void end() { |
387 list.sort(comparator); |
392 list.sort(comparator); |
388 downstream.begin(list.size()); |
393 downstream.begin(list.size()); |
389 if (!cancellationWasRequested) { |
394 if (!cancellationRequestedCalled) { |
390 list.forEach(downstream::accept); |
395 list.forEach(downstream::accept); |
391 } |
396 } |
392 else { |
397 else { |
393 for (T t : list) { |
398 for (T t : list) { |
394 if (downstream.cancellationRequested()) break; |
399 if (downstream.cancellationRequested()) break; |
407 |
412 |
408 /** |
413 /** |
409 * Abstract {@link Sink} for implementing sort on int streams. |
414 * Abstract {@link Sink} for implementing sort on int streams. |
410 */ |
415 */ |
411 private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { |
416 private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { |
412 protected boolean cancellationWasRequested; |
417 // true if cancellationRequested() has been called |
|
418 protected boolean cancellationRequestedCalled; |
413 |
419 |
414 AbstractIntSortingSink(Sink<? super Integer> downstream) { |
420 AbstractIntSortingSink(Sink<? super Integer> downstream) { |
415 super(downstream); |
421 super(downstream); |
416 } |
422 } |
417 |
423 |
418 @Override |
424 @Override |
419 public final boolean cancellationRequested() { |
425 public final boolean cancellationRequested() { |
420 cancellationWasRequested = true; |
426 cancellationRequestedCalled = true; |
421 return false; |
427 return false; |
422 } |
428 } |
423 } |
429 } |
424 |
430 |
425 /** |
431 /** |
442 |
448 |
443 @Override |
449 @Override |
444 public void end() { |
450 public void end() { |
445 Arrays.sort(array, 0, offset); |
451 Arrays.sort(array, 0, offset); |
446 downstream.begin(offset); |
452 downstream.begin(offset); |
447 if (!cancellationWasRequested) { |
453 if (!cancellationRequestedCalled) { |
448 for (int i = 0; i < offset; i++) |
454 for (int i = 0; i < offset; i++) |
449 downstream.accept(array[i]); |
455 downstream.accept(array[i]); |
450 } |
456 } |
451 else { |
457 else { |
452 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
458 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
482 @Override |
488 @Override |
483 public void end() { |
489 public void end() { |
484 int[] ints = b.asPrimitiveArray(); |
490 int[] ints = b.asPrimitiveArray(); |
485 Arrays.sort(ints); |
491 Arrays.sort(ints); |
486 downstream.begin(ints.length); |
492 downstream.begin(ints.length); |
487 if (!cancellationWasRequested) { |
493 if (!cancellationRequestedCalled) { |
488 for (int anInt : ints) |
494 for (int anInt : ints) |
489 downstream.accept(anInt); |
495 downstream.accept(anInt); |
490 } |
496 } |
491 else { |
497 else { |
492 for (int anInt : ints) { |
498 for (int anInt : ints) { |
505 |
511 |
506 /** |
512 /** |
507 * Abstract {@link Sink} for implementing sort on long streams. |
513 * Abstract {@link Sink} for implementing sort on long streams. |
508 */ |
514 */ |
509 private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> { |
515 private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> { |
510 protected boolean cancellationWasRequested; |
516 // true if cancellationRequested() has been called |
|
517 protected boolean cancellationRequestedCalled; |
511 |
518 |
512 AbstractLongSortingSink(Sink<? super Long> downstream) { |
519 AbstractLongSortingSink(Sink<? super Long> downstream) { |
513 super(downstream); |
520 super(downstream); |
514 } |
521 } |
515 |
522 |
516 @Override |
523 @Override |
517 public final boolean cancellationRequested() { |
524 public final boolean cancellationRequested() { |
518 cancellationWasRequested = true; |
525 cancellationRequestedCalled = true; |
519 return false; |
526 return false; |
520 } |
527 } |
521 } |
528 } |
522 |
529 |
523 /** |
530 /** |
540 |
547 |
541 @Override |
548 @Override |
542 public void end() { |
549 public void end() { |
543 Arrays.sort(array, 0, offset); |
550 Arrays.sort(array, 0, offset); |
544 downstream.begin(offset); |
551 downstream.begin(offset); |
545 if (!cancellationWasRequested) { |
552 if (!cancellationRequestedCalled) { |
546 for (int i = 0; i < offset; i++) |
553 for (int i = 0; i < offset; i++) |
547 downstream.accept(array[i]); |
554 downstream.accept(array[i]); |
548 } |
555 } |
549 else { |
556 else { |
550 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
557 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
580 @Override |
587 @Override |
581 public void end() { |
588 public void end() { |
582 long[] longs = b.asPrimitiveArray(); |
589 long[] longs = b.asPrimitiveArray(); |
583 Arrays.sort(longs); |
590 Arrays.sort(longs); |
584 downstream.begin(longs.length); |
591 downstream.begin(longs.length); |
585 if (!cancellationWasRequested) { |
592 if (!cancellationRequestedCalled) { |
586 for (long aLong : longs) |
593 for (long aLong : longs) |
587 downstream.accept(aLong); |
594 downstream.accept(aLong); |
588 } |
595 } |
589 else { |
596 else { |
590 for (long aLong : longs) { |
597 for (long aLong : longs) { |
603 |
610 |
604 /** |
611 /** |
605 * Abstract {@link Sink} for implementing sort on long streams. |
612 * Abstract {@link Sink} for implementing sort on long streams. |
606 */ |
613 */ |
607 private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { |
614 private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { |
608 protected boolean cancellationWasRequested; |
615 // true if cancellationRequested() has been called |
|
616 protected boolean cancellationRequestedCalled; |
609 |
617 |
610 AbstractDoubleSortingSink(Sink<? super Double> downstream) { |
618 AbstractDoubleSortingSink(Sink<? super Double> downstream) { |
611 super(downstream); |
619 super(downstream); |
612 } |
620 } |
613 |
621 |
614 @Override |
622 @Override |
615 public final boolean cancellationRequested() { |
623 public final boolean cancellationRequested() { |
616 cancellationWasRequested = true; |
624 cancellationRequestedCalled = true; |
617 return false; |
625 return false; |
618 } |
626 } |
619 } |
627 } |
620 |
628 |
621 /** |
629 /** |
638 |
646 |
639 @Override |
647 @Override |
640 public void end() { |
648 public void end() { |
641 Arrays.sort(array, 0, offset); |
649 Arrays.sort(array, 0, offset); |
642 downstream.begin(offset); |
650 downstream.begin(offset); |
643 if (!cancellationWasRequested) { |
651 if (!cancellationRequestedCalled) { |
644 for (int i = 0; i < offset; i++) |
652 for (int i = 0; i < offset; i++) |
645 downstream.accept(array[i]); |
653 downstream.accept(array[i]); |
646 } |
654 } |
647 else { |
655 else { |
648 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
656 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
678 @Override |
686 @Override |
679 public void end() { |
687 public void end() { |
680 double[] doubles = b.asPrimitiveArray(); |
688 double[] doubles = b.asPrimitiveArray(); |
681 Arrays.sort(doubles); |
689 Arrays.sort(doubles); |
682 downstream.begin(doubles.length); |
690 downstream.begin(doubles.length); |
683 if (!cancellationWasRequested) { |
691 if (!cancellationRequestedCalled) { |
684 for (double aDouble : doubles) |
692 for (double aDouble : doubles) |
685 downstream.accept(aDouble); |
693 downstream.accept(aDouble); |
686 } |
694 } |
687 else { |
695 else { |
688 for (double aDouble : doubles) { |
696 for (double aDouble : doubles) { |