277 } |
277 } |
278 } |
278 } |
279 } |
279 } |
280 |
280 |
281 /** |
281 /** |
|
282 * Abstract {@link Sink} for implementing sort on reference streams. |
|
283 * |
|
284 * <p> |
|
285 * Note: documentation below applies to reference and all primitive sinks. |
|
286 * <p> |
|
287 * Sorting sinks first accept all elements, buffering then into an array |
|
288 * or a re-sizable data structure, if the size of the pipeline is known or |
|
289 * unknown respectively. At the end of the sink protocol those elements are |
|
290 * sorted and then pushed downstream. |
|
291 * This class records if {@link #cancellationRequested} is called. If so it |
|
292 * can be inferred that the source pushing source elements into the pipeline |
|
293 * knows that the pipeline is short-circuiting. In such cases sub-classes |
|
294 * pushing elements downstream will preserve the short-circuiting protocol |
|
295 * by calling {@code downstream.cancellationRequested()} and checking the |
|
296 * result is {@code false} before an element is pushed. |
|
297 * <p> |
|
298 * Note that the above behaviour is an optimization for sorting with |
|
299 * sequential streams. It is not an error that more elements, than strictly |
|
300 * required to produce a result, may flow through the pipeline. This can |
|
301 * occur, in general (not restricted to just sorting), for short-circuiting |
|
302 * parallel pipelines. |
|
303 */ |
|
304 private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { |
|
305 protected final Comparator<? super T> comparator; |
|
306 // @@@ could be a lazy final value, if/when support is added |
|
307 protected boolean cancellationWasRequested; |
|
308 |
|
309 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { |
|
310 super(downstream); |
|
311 this.comparator = comparator; |
|
312 } |
|
313 |
|
314 /** |
|
315 * Records is cancellation is requested so short-circuiting behaviour |
|
316 * can be preserved when the sorted elements are pushed downstream. |
|
317 * |
|
318 * @return false, as this sink never short-circuits. |
|
319 */ |
|
320 @Override |
|
321 public final boolean cancellationRequested() { |
|
322 cancellationWasRequested = true; |
|
323 return false; |
|
324 } |
|
325 } |
|
326 |
|
327 /** |
282 * {@link Sink} for implementing sort on SIZED reference streams. |
328 * {@link Sink} for implementing sort on SIZED reference streams. |
283 */ |
329 */ |
284 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> { |
330 private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { |
285 private final Comparator<? super T> comparator; |
|
286 private T[] array; |
331 private T[] array; |
287 private int offset; |
332 private int offset; |
288 |
333 |
289 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { |
334 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { |
290 super(sink); |
335 super(sink, comparator); |
291 this.comparator = comparator; |
|
292 } |
336 } |
293 |
337 |
294 @Override |
338 @Override |
295 @SuppressWarnings("unchecked") |
339 @SuppressWarnings("unchecked") |
296 public void begin(long size) { |
340 public void begin(long size) { |
301 |
345 |
302 @Override |
346 @Override |
303 public void end() { |
347 public void end() { |
304 Arrays.sort(array, 0, offset, comparator); |
348 Arrays.sort(array, 0, offset, comparator); |
305 downstream.begin(offset); |
349 downstream.begin(offset); |
306 for (int i = 0; i < offset; i++) |
350 if (!cancellationWasRequested) { |
307 downstream.accept(array[i]); |
351 for (int i = 0; i < offset; i++) |
|
352 downstream.accept(array[i]); |
|
353 } |
|
354 else { |
|
355 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
|
356 downstream.accept(array[i]); |
|
357 } |
308 downstream.end(); |
358 downstream.end(); |
309 array = null; |
359 array = null; |
310 } |
360 } |
311 |
361 |
312 @Override |
362 @Override |
316 } |
366 } |
317 |
367 |
318 /** |
368 /** |
319 * {@link Sink} for implementing sort on reference streams. |
369 * {@link Sink} for implementing sort on reference streams. |
320 */ |
370 */ |
321 private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> { |
371 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { |
322 private final Comparator<? super T> comparator; |
|
323 private ArrayList<T> list; |
372 private ArrayList<T> list; |
324 |
373 |
325 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { |
374 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { |
326 super(sink); |
375 super(sink, comparator); |
327 this.comparator = comparator; |
|
328 } |
376 } |
329 |
377 |
330 @Override |
378 @Override |
331 public void begin(long size) { |
379 public void begin(long size) { |
332 if (size >= Nodes.MAX_ARRAY_SIZE) |
380 if (size >= Nodes.MAX_ARRAY_SIZE) |
348 list.add(t); |
404 list.add(t); |
349 } |
405 } |
350 } |
406 } |
351 |
407 |
352 /** |
408 /** |
|
409 * Abstract {@link Sink} for implementing sort on int streams. |
|
410 */ |
|
411 private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { |
|
412 protected boolean cancellationWasRequested; |
|
413 |
|
414 AbstractIntSortingSink(Sink<? super Integer> downstream) { |
|
415 super(downstream); |
|
416 } |
|
417 |
|
418 @Override |
|
419 public final boolean cancellationRequested() { |
|
420 cancellationWasRequested = true; |
|
421 return false; |
|
422 } |
|
423 } |
|
424 |
|
425 /** |
353 * {@link Sink} for implementing sort on SIZED int streams. |
426 * {@link Sink} for implementing sort on SIZED int streams. |
354 */ |
427 */ |
355 private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> { |
428 private static final class SizedIntSortingSink extends AbstractIntSortingSink { |
356 private int[] array; |
429 private int[] array; |
357 private int offset; |
430 private int offset; |
358 |
431 |
359 SizedIntSortingSink(Sink<? super Integer> downstream) { |
432 SizedIntSortingSink(Sink<? super Integer> downstream) { |
360 super(downstream); |
433 super(downstream); |
369 |
442 |
370 @Override |
443 @Override |
371 public void end() { |
444 public void end() { |
372 Arrays.sort(array, 0, offset); |
445 Arrays.sort(array, 0, offset); |
373 downstream.begin(offset); |
446 downstream.begin(offset); |
374 for (int i = 0; i < offset; i++) |
447 if (!cancellationWasRequested) { |
375 downstream.accept(array[i]); |
448 for (int i = 0; i < offset; i++) |
|
449 downstream.accept(array[i]); |
|
450 } |
|
451 else { |
|
452 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
|
453 downstream.accept(array[i]); |
|
454 } |
376 downstream.end(); |
455 downstream.end(); |
377 array = null; |
456 array = null; |
378 } |
457 } |
379 |
458 |
380 @Override |
459 @Override |
403 @Override |
482 @Override |
404 public void end() { |
483 public void end() { |
405 int[] ints = b.asPrimitiveArray(); |
484 int[] ints = b.asPrimitiveArray(); |
406 Arrays.sort(ints); |
485 Arrays.sort(ints); |
407 downstream.begin(ints.length); |
486 downstream.begin(ints.length); |
408 for (int anInt : ints) |
487 if (!cancellationWasRequested) { |
409 downstream.accept(anInt); |
488 for (int anInt : ints) |
|
489 downstream.accept(anInt); |
|
490 } |
|
491 else { |
|
492 for (int anInt : ints) { |
|
493 if (downstream.cancellationRequested()) break; |
|
494 downstream.accept(anInt); |
|
495 } |
|
496 } |
410 downstream.end(); |
497 downstream.end(); |
411 } |
498 } |
412 |
499 |
413 @Override |
500 @Override |
414 public void accept(int t) { |
501 public void accept(int t) { |
415 b.accept(t); |
502 b.accept(t); |
416 } |
503 } |
417 } |
504 } |
418 |
505 |
419 /** |
506 /** |
|
507 * Abstract {@link Sink} for implementing sort on long streams. |
|
508 */ |
|
509 private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> { |
|
510 protected boolean cancellationWasRequested; |
|
511 |
|
512 AbstractLongSortingSink(Sink<? super Long> downstream) { |
|
513 super(downstream); |
|
514 } |
|
515 |
|
516 @Override |
|
517 public final boolean cancellationRequested() { |
|
518 cancellationWasRequested = true; |
|
519 return false; |
|
520 } |
|
521 } |
|
522 |
|
523 /** |
420 * {@link Sink} for implementing sort on SIZED long streams. |
524 * {@link Sink} for implementing sort on SIZED long streams. |
421 */ |
525 */ |
422 private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> { |
526 private static final class SizedLongSortingSink extends AbstractLongSortingSink { |
423 private long[] array; |
527 private long[] array; |
424 private int offset; |
528 private int offset; |
425 |
529 |
426 SizedLongSortingSink(Sink<? super Long> downstream) { |
530 SizedLongSortingSink(Sink<? super Long> downstream) { |
427 super(downstream); |
531 super(downstream); |
436 |
540 |
437 @Override |
541 @Override |
438 public void end() { |
542 public void end() { |
439 Arrays.sort(array, 0, offset); |
543 Arrays.sort(array, 0, offset); |
440 downstream.begin(offset); |
544 downstream.begin(offset); |
441 for (int i = 0; i < offset; i++) |
545 if (!cancellationWasRequested) { |
442 downstream.accept(array[i]); |
546 for (int i = 0; i < offset; i++) |
|
547 downstream.accept(array[i]); |
|
548 } |
|
549 else { |
|
550 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
|
551 downstream.accept(array[i]); |
|
552 } |
443 downstream.end(); |
553 downstream.end(); |
444 array = null; |
554 array = null; |
445 } |
555 } |
446 |
556 |
447 @Override |
557 @Override |
470 @Override |
580 @Override |
471 public void end() { |
581 public void end() { |
472 long[] longs = b.asPrimitiveArray(); |
582 long[] longs = b.asPrimitiveArray(); |
473 Arrays.sort(longs); |
583 Arrays.sort(longs); |
474 downstream.begin(longs.length); |
584 downstream.begin(longs.length); |
475 for (long aLong : longs) |
585 if (!cancellationWasRequested) { |
476 downstream.accept(aLong); |
586 for (long aLong : longs) |
|
587 downstream.accept(aLong); |
|
588 } |
|
589 else { |
|
590 for (long aLong : longs) { |
|
591 if (downstream.cancellationRequested()) break; |
|
592 downstream.accept(aLong); |
|
593 } |
|
594 } |
477 downstream.end(); |
595 downstream.end(); |
478 } |
596 } |
479 |
597 |
480 @Override |
598 @Override |
481 public void accept(long t) { |
599 public void accept(long t) { |
482 b.accept(t); |
600 b.accept(t); |
483 } |
601 } |
484 } |
602 } |
485 |
603 |
486 /** |
604 /** |
|
605 * Abstract {@link Sink} for implementing sort on long streams. |
|
606 */ |
|
607 private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { |
|
608 protected boolean cancellationWasRequested; |
|
609 |
|
610 AbstractDoubleSortingSink(Sink<? super Double> downstream) { |
|
611 super(downstream); |
|
612 } |
|
613 |
|
614 @Override |
|
615 public final boolean cancellationRequested() { |
|
616 cancellationWasRequested = true; |
|
617 return false; |
|
618 } |
|
619 } |
|
620 |
|
621 /** |
487 * {@link Sink} for implementing sort on SIZED double streams. |
622 * {@link Sink} for implementing sort on SIZED double streams. |
488 */ |
623 */ |
489 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> { |
624 private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink { |
490 private double[] array; |
625 private double[] array; |
491 private int offset; |
626 private int offset; |
492 |
627 |
493 SizedDoubleSortingSink(Sink<? super Double> downstream) { |
628 SizedDoubleSortingSink(Sink<? super Double> downstream) { |
494 super(downstream); |
629 super(downstream); |
503 |
638 |
504 @Override |
639 @Override |
505 public void end() { |
640 public void end() { |
506 Arrays.sort(array, 0, offset); |
641 Arrays.sort(array, 0, offset); |
507 downstream.begin(offset); |
642 downstream.begin(offset); |
508 for (int i = 0; i < offset; i++) |
643 if (!cancellationWasRequested) { |
509 downstream.accept(array[i]); |
644 for (int i = 0; i < offset; i++) |
|
645 downstream.accept(array[i]); |
|
646 } |
|
647 else { |
|
648 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) |
|
649 downstream.accept(array[i]); |
|
650 } |
510 downstream.end(); |
651 downstream.end(); |
511 array = null; |
652 array = null; |
512 } |
653 } |
513 |
654 |
514 @Override |
655 @Override |
537 @Override |
678 @Override |
538 public void end() { |
679 public void end() { |
539 double[] doubles = b.asPrimitiveArray(); |
680 double[] doubles = b.asPrimitiveArray(); |
540 Arrays.sort(doubles); |
681 Arrays.sort(doubles); |
541 downstream.begin(doubles.length); |
682 downstream.begin(doubles.length); |
542 for (double aDouble : doubles) |
683 if (!cancellationWasRequested) { |
543 downstream.accept(aDouble); |
684 for (double aDouble : doubles) |
|
685 downstream.accept(aDouble); |
|
686 } |
|
687 else { |
|
688 for (double aDouble : doubles) { |
|
689 if (downstream.cancellationRequested()) break; |
|
690 downstream.accept(aDouble); |
|
691 } |
|
692 } |
544 downstream.end(); |
693 downstream.end(); |
545 } |
694 } |
546 |
695 |
547 @Override |
696 @Override |
548 public void accept(double t) { |
697 public void accept(double t) { |