jdk/src/share/classes/java/util/stream/SortedOps.java
changeset 24258 0e9ab834f44a
parent 22297 1c62c67d9dd2
equal deleted inserted replaced
24257:f524e23d7f7b 24258:0e9ab834f44a
   277             }
   277             }
   278         }
   278         }
   279     }
   279     }
   280 
   280 
   281     /**
   281     /**
       
   282      * Abstract {@link Sink} for implementing sort on reference streams.
       
   283      *
       
   284      * <p>
       
   285      * Note: documentation below applies to reference and all primitive sinks.
       
   286      * <p>
       
   287      * Sorting sinks first accept all elements, buffering then into an array
       
   288      * or a re-sizable data structure, if the size of the pipeline is known or
       
   289      * unknown respectively.  At the end of the sink protocol those elements are
       
   290      * sorted and then pushed downstream.
       
   291      * This class records if {@link #cancellationRequested} is called.  If so it
       
   292      * can be inferred that the source pushing source elements into the pipeline
       
   293      * knows that the pipeline is short-circuiting.  In such cases sub-classes
       
   294      * pushing elements downstream will preserve the short-circuiting protocol
       
   295      * by calling {@code downstream.cancellationRequested()} and checking the
       
   296      * result is {@code false} before an element is pushed.
       
   297      * <p>
       
   298      * Note that the above behaviour is an optimization for sorting with
       
   299      * sequential streams.  It is not an error that more elements, than strictly
       
   300      * required to produce a result, may flow through the pipeline.  This can
       
   301      * occur, in general (not restricted to just sorting), for short-circuiting
       
   302      * parallel pipelines.
       
   303      */
       
   304     private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
       
   305         protected final Comparator<? super T> comparator;
       
   306         // @@@ could be a lazy final value, if/when support is added
       
   307         protected boolean cancellationWasRequested;
       
   308 
       
   309         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
       
   310             super(downstream);
       
   311             this.comparator = comparator;
       
   312         }
       
   313 
       
   314         /**
       
   315          * Records is cancellation is requested so short-circuiting behaviour
       
   316          * can be preserved when the sorted elements are pushed downstream.
       
   317          *
       
   318          * @return false, as this sink never short-circuits.
       
   319          */
       
   320         @Override
       
   321         public final boolean cancellationRequested() {
       
   322             cancellationWasRequested = true;
       
   323             return false;
       
   324         }
       
   325     }
       
   326 
       
   327     /**
   282      * {@link Sink} for implementing sort on SIZED reference streams.
   328      * {@link Sink} for implementing sort on SIZED reference streams.
   283      */
   329      */
   284     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> {
   330     private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
   285         private final Comparator<? super T> comparator;
       
   286         private T[] array;
   331         private T[] array;
   287         private int offset;
   332         private int offset;
   288 
   333 
   289         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
   334         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
   290             super(sink);
   335             super(sink, comparator);
   291             this.comparator = comparator;
       
   292         }
   336         }
   293 
   337 
   294         @Override
   338         @Override
   295         @SuppressWarnings("unchecked")
   339         @SuppressWarnings("unchecked")
   296         public void begin(long size) {
   340         public void begin(long size) {
   301 
   345 
   302         @Override
   346         @Override
   303         public void end() {
   347         public void end() {
   304             Arrays.sort(array, 0, offset, comparator);
   348             Arrays.sort(array, 0, offset, comparator);
   305             downstream.begin(offset);
   349             downstream.begin(offset);
   306             for (int i = 0; i < offset; i++)
   350             if (!cancellationWasRequested) {
   307                 downstream.accept(array[i]);
   351                 for (int i = 0; i < offset; i++)
       
   352                     downstream.accept(array[i]);
       
   353             }
       
   354             else {
       
   355                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
       
   356                     downstream.accept(array[i]);
       
   357             }
   308             downstream.end();
   358             downstream.end();
   309             array = null;
   359             array = null;
   310         }
   360         }
   311 
   361 
   312         @Override
   362         @Override
   316     }
   366     }
   317 
   367 
   318     /**
   368     /**
   319      * {@link Sink} for implementing sort on reference streams.
   369      * {@link Sink} for implementing sort on reference streams.
   320      */
   370      */
   321     private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
   371     private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
   322         private final Comparator<? super T> comparator;
       
   323         private ArrayList<T> list;
   372         private ArrayList<T> list;
   324 
   373 
   325         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
   374         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
   326             super(sink);
   375             super(sink, comparator);
   327             this.comparator = comparator;
       
   328         }
   376         }
   329 
   377 
   330         @Override
   378         @Override
   331         public void begin(long size) {
   379         public void begin(long size) {
   332             if (size >= Nodes.MAX_ARRAY_SIZE)
   380             if (size >= Nodes.MAX_ARRAY_SIZE)
   336 
   384 
   337         @Override
   385         @Override
   338         public void end() {
   386         public void end() {
   339             list.sort(comparator);
   387             list.sort(comparator);
   340             downstream.begin(list.size());
   388             downstream.begin(list.size());
   341             list.forEach(downstream::accept);
   389             if (!cancellationWasRequested) {
       
   390                 list.forEach(downstream::accept);
       
   391             }
       
   392             else {
       
   393                 for (T t : list) {
       
   394                     if (downstream.cancellationRequested()) break;
       
   395                     downstream.accept(t);
       
   396                 }
       
   397             }
   342             downstream.end();
   398             downstream.end();
   343             list = null;
   399             list = null;
   344         }
   400         }
   345 
   401 
   346         @Override
   402         @Override
   348             list.add(t);
   404             list.add(t);
   349         }
   405         }
   350     }
   406     }
   351 
   407 
   352     /**
   408     /**
       
   409      * Abstract {@link Sink} for implementing sort on int streams.
       
   410      */
       
   411     private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
       
   412         protected boolean cancellationWasRequested;
       
   413 
       
   414         AbstractIntSortingSink(Sink<? super Integer> downstream) {
       
   415             super(downstream);
       
   416         }
       
   417 
       
   418         @Override
       
   419         public final boolean cancellationRequested() {
       
   420             cancellationWasRequested = true;
       
   421             return false;
       
   422         }
       
   423     }
       
   424 
       
   425     /**
   353      * {@link Sink} for implementing sort on SIZED int streams.
   426      * {@link Sink} for implementing sort on SIZED int streams.
   354      */
   427      */
   355     private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> {
   428     private static final class SizedIntSortingSink extends AbstractIntSortingSink {
   356         private int[] array;
   429         private int[] array;
   357         private int offset;
   430         private int offset;
   358 
   431 
   359         SizedIntSortingSink(Sink<? super Integer> downstream) {
   432         SizedIntSortingSink(Sink<? super Integer> downstream) {
   360             super(downstream);
   433             super(downstream);
   369 
   442 
   370         @Override
   443         @Override
   371         public void end() {
   444         public void end() {
   372             Arrays.sort(array, 0, offset);
   445             Arrays.sort(array, 0, offset);
   373             downstream.begin(offset);
   446             downstream.begin(offset);
   374             for (int i = 0; i < offset; i++)
   447             if (!cancellationWasRequested) {
   375                 downstream.accept(array[i]);
   448                 for (int i = 0; i < offset; i++)
       
   449                     downstream.accept(array[i]);
       
   450             }
       
   451             else {
       
   452                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
       
   453                     downstream.accept(array[i]);
       
   454             }
   376             downstream.end();
   455             downstream.end();
   377             array = null;
   456             array = null;
   378         }
   457         }
   379 
   458 
   380         @Override
   459         @Override
   384     }
   463     }
   385 
   464 
   386     /**
   465     /**
   387      * {@link Sink} for implementing sort on int streams.
   466      * {@link Sink} for implementing sort on int streams.
   388      */
   467      */
   389     private static final class IntSortingSink extends Sink.ChainedInt<Integer> {
   468     private static final class IntSortingSink extends AbstractIntSortingSink {
   390         private SpinedBuffer.OfInt b;
   469         private SpinedBuffer.OfInt b;
   391 
   470 
   392         IntSortingSink(Sink<? super Integer> sink) {
   471         IntSortingSink(Sink<? super Integer> sink) {
   393             super(sink);
   472             super(sink);
   394         }
   473         }
   403         @Override
   482         @Override
   404         public void end() {
   483         public void end() {
   405             int[] ints = b.asPrimitiveArray();
   484             int[] ints = b.asPrimitiveArray();
   406             Arrays.sort(ints);
   485             Arrays.sort(ints);
   407             downstream.begin(ints.length);
   486             downstream.begin(ints.length);
   408             for (int anInt : ints)
   487             if (!cancellationWasRequested) {
   409                 downstream.accept(anInt);
   488                 for (int anInt : ints)
       
   489                     downstream.accept(anInt);
       
   490             }
       
   491             else {
       
   492                 for (int anInt : ints) {
       
   493                     if (downstream.cancellationRequested()) break;
       
   494                     downstream.accept(anInt);
       
   495                 }
       
   496             }
   410             downstream.end();
   497             downstream.end();
   411         }
   498         }
   412 
   499 
   413         @Override
   500         @Override
   414         public void accept(int t) {
   501         public void accept(int t) {
   415             b.accept(t);
   502             b.accept(t);
   416         }
   503         }
   417     }
   504     }
   418 
   505 
   419     /**
   506     /**
       
   507      * Abstract {@link Sink} for implementing sort on long streams.
       
   508      */
       
   509     private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
       
   510         protected boolean cancellationWasRequested;
       
   511 
       
   512         AbstractLongSortingSink(Sink<? super Long> downstream) {
       
   513             super(downstream);
       
   514         }
       
   515 
       
   516         @Override
       
   517         public final boolean cancellationRequested() {
       
   518             cancellationWasRequested = true;
       
   519             return false;
       
   520         }
       
   521     }
       
   522 
       
   523     /**
   420      * {@link Sink} for implementing sort on SIZED long streams.
   524      * {@link Sink} for implementing sort on SIZED long streams.
   421      */
   525      */
   422     private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> {
   526     private static final class SizedLongSortingSink extends AbstractLongSortingSink {
   423         private long[] array;
   527         private long[] array;
   424         private int offset;
   528         private int offset;
   425 
   529 
   426         SizedLongSortingSink(Sink<? super Long> downstream) {
   530         SizedLongSortingSink(Sink<? super Long> downstream) {
   427             super(downstream);
   531             super(downstream);
   436 
   540 
   437         @Override
   541         @Override
   438         public void end() {
   542         public void end() {
   439             Arrays.sort(array, 0, offset);
   543             Arrays.sort(array, 0, offset);
   440             downstream.begin(offset);
   544             downstream.begin(offset);
   441             for (int i = 0; i < offset; i++)
   545             if (!cancellationWasRequested) {
   442                 downstream.accept(array[i]);
   546                 for (int i = 0; i < offset; i++)
       
   547                     downstream.accept(array[i]);
       
   548             }
       
   549             else {
       
   550                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
       
   551                     downstream.accept(array[i]);
       
   552             }
   443             downstream.end();
   553             downstream.end();
   444             array = null;
   554             array = null;
   445         }
   555         }
   446 
   556 
   447         @Override
   557         @Override
   451     }
   561     }
   452 
   562 
   453     /**
   563     /**
   454      * {@link Sink} for implementing sort on long streams.
   564      * {@link Sink} for implementing sort on long streams.
   455      */
   565      */
   456     private static final class LongSortingSink extends Sink.ChainedLong<Long> {
   566     private static final class LongSortingSink extends AbstractLongSortingSink {
   457         private SpinedBuffer.OfLong b;
   567         private SpinedBuffer.OfLong b;
   458 
   568 
   459         LongSortingSink(Sink<? super Long> sink) {
   569         LongSortingSink(Sink<? super Long> sink) {
   460             super(sink);
   570             super(sink);
   461         }
   571         }
   470         @Override
   580         @Override
   471         public void end() {
   581         public void end() {
   472             long[] longs = b.asPrimitiveArray();
   582             long[] longs = b.asPrimitiveArray();
   473             Arrays.sort(longs);
   583             Arrays.sort(longs);
   474             downstream.begin(longs.length);
   584             downstream.begin(longs.length);
   475             for (long aLong : longs)
   585             if (!cancellationWasRequested) {
   476                 downstream.accept(aLong);
   586                 for (long aLong : longs)
       
   587                     downstream.accept(aLong);
       
   588             }
       
   589             else {
       
   590                 for (long aLong : longs) {
       
   591                     if (downstream.cancellationRequested()) break;
       
   592                     downstream.accept(aLong);
       
   593                 }
       
   594             }
   477             downstream.end();
   595             downstream.end();
   478         }
   596         }
   479 
   597 
   480         @Override
   598         @Override
   481         public void accept(long t) {
   599         public void accept(long t) {
   482             b.accept(t);
   600             b.accept(t);
   483         }
   601         }
   484     }
   602     }
   485 
   603 
   486     /**
   604     /**
       
   605      * Abstract {@link Sink} for implementing sort on long streams.
       
   606      */
       
   607     private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
       
   608         protected boolean cancellationWasRequested;
       
   609 
       
   610         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
       
   611             super(downstream);
       
   612         }
       
   613 
       
   614         @Override
       
   615         public final boolean cancellationRequested() {
       
   616             cancellationWasRequested = true;
       
   617             return false;
       
   618         }
       
   619     }
       
   620 
       
   621     /**
   487      * {@link Sink} for implementing sort on SIZED double streams.
   622      * {@link Sink} for implementing sort on SIZED double streams.
   488      */
   623      */
   489     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> {
   624     private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
   490         private double[] array;
   625         private double[] array;
   491         private int offset;
   626         private int offset;
   492 
   627 
   493         SizedDoubleSortingSink(Sink<? super Double> downstream) {
   628         SizedDoubleSortingSink(Sink<? super Double> downstream) {
   494             super(downstream);
   629             super(downstream);
   503 
   638 
   504         @Override
   639         @Override
   505         public void end() {
   640         public void end() {
   506             Arrays.sort(array, 0, offset);
   641             Arrays.sort(array, 0, offset);
   507             downstream.begin(offset);
   642             downstream.begin(offset);
   508             for (int i = 0; i < offset; i++)
   643             if (!cancellationWasRequested) {
   509                 downstream.accept(array[i]);
   644                 for (int i = 0; i < offset; i++)
       
   645                     downstream.accept(array[i]);
       
   646             }
       
   647             else {
       
   648                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
       
   649                     downstream.accept(array[i]);
       
   650             }
   510             downstream.end();
   651             downstream.end();
   511             array = null;
   652             array = null;
   512         }
   653         }
   513 
   654 
   514         @Override
   655         @Override
   518     }
   659     }
   519 
   660 
   520     /**
   661     /**
   521      * {@link Sink} for implementing sort on double streams.
   662      * {@link Sink} for implementing sort on double streams.
   522      */
   663      */
   523     private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> {
   664     private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
   524         private SpinedBuffer.OfDouble b;
   665         private SpinedBuffer.OfDouble b;
   525 
   666 
   526         DoubleSortingSink(Sink<? super Double> sink) {
   667         DoubleSortingSink(Sink<? super Double> sink) {
   527             super(sink);
   668             super(sink);
   528         }
   669         }
   537         @Override
   678         @Override
   538         public void end() {
   679         public void end() {
   539             double[] doubles = b.asPrimitiveArray();
   680             double[] doubles = b.asPrimitiveArray();
   540             Arrays.sort(doubles);
   681             Arrays.sort(doubles);
   541             downstream.begin(doubles.length);
   682             downstream.begin(doubles.length);
   542             for (double aDouble : doubles)
   683             if (!cancellationWasRequested) {
   543                 downstream.accept(aDouble);
   684                 for (double aDouble : doubles)
       
   685                     downstream.accept(aDouble);
       
   686             }
       
   687             else {
       
   688                 for (double aDouble : doubles) {
       
   689                     if (downstream.cancellationRequested()) break;
       
   690                     downstream.accept(aDouble);
       
   691                 }
       
   692             }
   544             downstream.end();
   693             downstream.end();
   545         }
   694         }
   546 
   695 
   547         @Override
   696         @Override
   548         public void accept(double t) {
   697         public void accept(double t) {