src/java.base/share/classes/java/util/stream/IntPipeline.java
changeset 48593 fca88bbbafb9
parent 47216 71c04702a3d5
equal deleted inserted replaced
48592:ef70df777355 48593:fca88bbbafb9
     1 /*
     1 /*
     2  * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
   296         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
   296         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
   297                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   297                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
   298             @Override
   298             @Override
   299             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
   299             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
   300                 return new Sink.ChainedInt<Integer>(sink) {
   300                 return new Sink.ChainedInt<Integer>(sink) {
       
   301                     // true if cancellationRequested() has been called
       
   302                     boolean cancellationRequestedCalled;
       
   303 
       
   304                     // cache the consumer to avoid creation on every accepted element
       
   305                     IntConsumer downstreamAsInt = downstream::accept;
       
   306 
   301                     @Override
   307                     @Override
   302                     public void begin(long size) {
   308                     public void begin(long size) {
   303                         downstream.begin(-1);
   309                         downstream.begin(-1);
   304                     }
   310                     }
   305 
   311 
   306                     @Override
   312                     @Override
   307                     public void accept(int t) {
   313                     public void accept(int t) {
   308                         try (IntStream result = mapper.apply(t)) {
   314                         try (IntStream result = mapper.apply(t)) {
   309                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
   315                             if (result != null) {
   310                             if (result != null)
   316                                 if (!cancellationRequestedCalled) {
   311                                 result.sequential().forEach(i -> downstream.accept(i));
   317                                     result.sequential().forEach(downstreamAsInt);
       
   318                                 }
       
   319                                 else {
       
   320                                     var s = result.sequential().spliterator();
       
   321                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
       
   322                                 }
       
   323                             }
   312                         }
   324                         }
       
   325                     }
       
   326 
       
   327                     @Override
       
   328                     public boolean cancellationRequested() {
       
   329                         // If this method is called then an operation within the stream
       
   330                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
       
   331                         // Note that we cannot differentiate between an upstream or
       
   332                         // downstream operation
       
   333                         cancellationRequestedCalled = true;
       
   334                         return downstream.cancellationRequested();
   313                     }
   335                     }
   314                 };
   336                 };
   315             }
   337             }
   316         };
   338         };
   317     }
   339     }