src/java.base/share/classes/java/util/stream/ReferencePipeline.java
changeset 48593 fca88bbbafb9
parent 47216 71c04702a3d5
equal deleted inserted replaced
48592:ef70df777355 48593:fca88bbbafb9
     1 /*
     1 /*
     2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
   251     }
   251     }
   252 
   252 
   253     @Override
   253     @Override
   254     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
   254     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
   255         Objects.requireNonNull(mapper);
   255         Objects.requireNonNull(mapper);
   256         // We can do better than this, by polling cancellationRequested when stream is infinite
       
   257         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
   256         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
   258                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   257                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   259             @Override
   258             @Override
   260             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
   259             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
   261                 return new Sink.ChainedReference<P_OUT, R>(sink) {
   260                 return new Sink.ChainedReference<P_OUT, R>(sink) {
       
   261                     // true if cancellationRequested() has been called
       
   262                     boolean cancellationRequestedCalled;
       
   263 
   262                     @Override
   264                     @Override
   263                     public void begin(long size) {
   265                     public void begin(long size) {
   264                         downstream.begin(-1);
   266                         downstream.begin(-1);
   265                     }
   267                     }
   266 
   268 
   267                     @Override
   269                     @Override
   268                     public void accept(P_OUT u) {
   270                     public void accept(P_OUT u) {
   269                         try (Stream<? extends R> result = mapper.apply(u)) {
   271                         try (Stream<? extends R> result = mapper.apply(u)) {
   270                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
   272                             if (result != null) {
   271                             if (result != null)
   273                                 if (!cancellationRequestedCalled) {
   272                                 result.sequential().forEach(downstream);
   274                                     result.sequential().forEach(downstream);
       
   275                                 }
       
   276                                 else {
       
   277                                     var s = result.sequential().spliterator();
       
   278                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
       
   279                                 }
       
   280                             }
   273                         }
   281                         }
       
   282                     }
       
   283 
       
   284                     @Override
       
   285                     public boolean cancellationRequested() {
       
   286                         // If this method is called then an operation within the stream
       
   287                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
       
   288                         // Note that we cannot differentiate between an upstream or
       
   289                         // downstream operation
       
   290                         cancellationRequestedCalled = true;
       
   291                         return downstream.cancellationRequested();
   274                     }
   292                     }
   275                 };
   293                 };
   276             }
   294             }
   277         };
   295         };
   278     }
   296     }
   279 
   297 
   280     @Override
   298     @Override
   281     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
   299     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
   282         Objects.requireNonNull(mapper);
   300         Objects.requireNonNull(mapper);
   283         // We can do better than this, by polling cancellationRequested when stream is infinite
       
   284         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   301         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   285                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   302                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   286             @Override
   303             @Override
   287             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
   304             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
   288                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
   305                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
       
   306                     // true if cancellationRequested() has been called
       
   307                     boolean cancellationRequestedCalled;
       
   308 
       
   309                     // cache the consumer to avoid creation on every accepted element
   289                     IntConsumer downstreamAsInt = downstream::accept;
   310                     IntConsumer downstreamAsInt = downstream::accept;
       
   311 
   290                     @Override
   312                     @Override
   291                     public void begin(long size) {
   313                     public void begin(long size) {
   292                         downstream.begin(-1);
   314                         downstream.begin(-1);
   293                     }
   315                     }
   294 
   316 
   295                     @Override
   317                     @Override
   296                     public void accept(P_OUT u) {
   318                     public void accept(P_OUT u) {
   297                         try (IntStream result = mapper.apply(u)) {
   319                         try (IntStream result = mapper.apply(u)) {
   298                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
   320                             if (result != null) {
   299                             if (result != null)
   321                                 if (!cancellationRequestedCalled) {
   300                                 result.sequential().forEach(downstreamAsInt);
   322                                     result.sequential().forEach(downstreamAsInt);
       
   323                                 }
       
   324                                 else {
       
   325                                     var s = result.sequential().spliterator();
       
   326                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
       
   327                                 }
       
   328                             }
   301                         }
   329                         }
       
   330                     }
       
   331 
       
   332                     @Override
       
   333                     public boolean cancellationRequested() {
       
   334                         cancellationRequestedCalled = true;
       
   335                         return downstream.cancellationRequested();
   302                     }
   336                     }
   303                 };
   337                 };
   304             }
   338             }
   305         };
   339         };
   306     }
   340     }
   307 
   341 
   308     @Override
   342     @Override
   309     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
   343     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
   310         Objects.requireNonNull(mapper);
   344         Objects.requireNonNull(mapper);
   311         // We can do better than this, by polling cancellationRequested when stream is infinite
       
   312         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   345         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   313                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   346                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   314             @Override
   347             @Override
   315             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
   348             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
   316                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
   349                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
       
   350                     // true if cancellationRequested() has been called
       
   351                     boolean cancellationRequestedCalled;
       
   352 
       
   353                     // cache the consumer to avoid creation on every accepted element
   317                     DoubleConsumer downstreamAsDouble = downstream::accept;
   354                     DoubleConsumer downstreamAsDouble = downstream::accept;
       
   355 
   318                     @Override
   356                     @Override
   319                     public void begin(long size) {
   357                     public void begin(long size) {
   320                         downstream.begin(-1);
   358                         downstream.begin(-1);
   321                     }
   359                     }
   322 
   360 
   323                     @Override
   361                     @Override
   324                     public void accept(P_OUT u) {
   362                     public void accept(P_OUT u) {
   325                         try (DoubleStream result = mapper.apply(u)) {
   363                         try (DoubleStream result = mapper.apply(u)) {
   326                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
   364                             if (result != null) {
   327                             if (result != null)
   365                                 if (!cancellationRequestedCalled) {
   328                                 result.sequential().forEach(downstreamAsDouble);
   366                                     result.sequential().forEach(downstreamAsDouble);
       
   367                                 }
       
   368                                 else {
       
   369                                     var s = result.sequential().spliterator();
       
   370                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
       
   371                                 }
       
   372                             }
   329                         }
   373                         }
       
   374                     }
       
   375 
       
   376                     @Override
       
   377                     public boolean cancellationRequested() {
       
   378                         cancellationRequestedCalled = true;
       
   379                         return downstream.cancellationRequested();
   330                     }
   380                     }
   331                 };
   381                 };
   332             }
   382             }
   333         };
   383         };
   334     }
   384     }
   340         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   390         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
   341                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   391                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   342             @Override
   392             @Override
   343             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
   393             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
   344                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
   394                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
       
   395                     // true if cancellationRequested() has been called
       
   396                     boolean cancellationRequestedCalled;
       
   397 
       
   398                     // cache the consumer to avoid creation on every accepted element
   345                     LongConsumer downstreamAsLong = downstream::accept;
   399                     LongConsumer downstreamAsLong = downstream::accept;
       
   400 
   346                     @Override
   401                     @Override
   347                     public void begin(long size) {
   402                     public void begin(long size) {
   348                         downstream.begin(-1);
   403                         downstream.begin(-1);
   349                     }
   404                     }
   350 
   405 
   351                     @Override
   406                     @Override
   352                     public void accept(P_OUT u) {
   407                     public void accept(P_OUT u) {
   353                         try (LongStream result = mapper.apply(u)) {
   408                         try (LongStream result = mapper.apply(u)) {
   354                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
   409                             if (result != null) {
   355                             if (result != null)
   410                                 if (!cancellationRequestedCalled) {
   356                                 result.sequential().forEach(downstreamAsLong);
   411                                     result.sequential().forEach(downstreamAsLong);
       
   412                                 }
       
   413                                 else {
       
   414                                     var s = result.sequential().spliterator();
       
   415                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
       
   416                                 }
       
   417                             }
   357                         }
   418                         }
       
   419                     }
       
   420 
       
   421                     @Override
       
   422                     public boolean cancellationRequested() {
       
   423                         cancellationRequestedCalled = true;
       
   424                         return downstream.cancellationRequested();
   358                     }
   425                     }
   359                 };
   426                 };
   360             }
   427             }
   361         };
   428         };
   362     }
   429     }