jdk/src/share/classes/java/util/stream/LongPipeline.java
changeset 17182 b786c0de868c
child 18154 5ede18269905
equal deleted inserted replaced
17181:e3d13a15c5c0 17182:b786c0de868c
       
     1 /*
       
     2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package java.util.stream;
       
    26 
       
    27 import java.util.LongSummaryStatistics;
       
    28 import java.util.Objects;
       
    29 import java.util.OptionalDouble;
       
    30 import java.util.OptionalLong;
       
    31 import java.util.PrimitiveIterator;
       
    32 import java.util.Spliterator;
       
    33 import java.util.Spliterators;
       
    34 import java.util.function.BiConsumer;
       
    35 import java.util.function.BinaryOperator;
       
    36 import java.util.function.IntFunction;
       
    37 import java.util.function.LongBinaryOperator;
       
    38 import java.util.function.LongConsumer;
       
    39 import java.util.function.LongFunction;
       
    40 import java.util.function.LongPredicate;
       
    41 import java.util.function.LongToDoubleFunction;
       
    42 import java.util.function.LongToIntFunction;
       
    43 import java.util.function.LongUnaryOperator;
       
    44 import java.util.function.ObjLongConsumer;
       
    45 import java.util.function.Supplier;
       
    46 
       
    47 /**
       
    48  * Abstract base class for an intermediate pipeline stage or pipeline source
       
    49  * stage implementing whose elements are of type {@code long}.
       
    50  *
       
    51  * @param <E_IN> type of elements in the upstream source
       
    52  * @since 1.8
       
    53  */
       
    54 abstract class LongPipeline<E_IN>
       
    55         extends AbstractPipeline<E_IN, Long, LongStream>
       
    56         implements LongStream {
       
    57 
       
    58     /**
       
    59      * Constructor for the head of a stream pipeline.
       
    60      *
       
    61      * @param source {@code Supplier<Spliterator>} describing the stream source
       
    62      * @param sourceFlags the source flags for the stream source, described in
       
    63      *        {@link StreamOpFlag}
       
    64      * @param parallel {@code true} if the pipeline is parallel
       
    65      */
       
    66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
       
    67                  int sourceFlags, boolean parallel) {
       
    68         super(source, sourceFlags, parallel);
       
    69     }
       
    70 
       
    71     /**
       
    72      * Constructor for the head of a stream pipeline.
       
    73      *
       
    74      * @param source {@code Spliterator} describing the stream source
       
    75      * @param sourceFlags the source flags for the stream source, described in
       
    76      *        {@link StreamOpFlag}
       
    77      * @param parallel {@code true} if the pipeline is parallel
       
    78      */
       
    79     LongPipeline(Spliterator<Long> source,
       
    80                  int sourceFlags, boolean parallel) {
       
    81         super(source, sourceFlags, parallel);
       
    82     }
       
    83 
       
    84     /**
       
    85      * Constructor for appending an intermediate operation onto an existing pipeline.
       
    86      *
       
    87      * @param upstream the upstream element source.
       
    88      * @param opFlags the operation flags
       
    89      */
       
    90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
       
    91         super(upstream, opFlags);
       
    92     }
       
    93 
       
    94     /**
       
    95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
       
    96      * by casting.
       
    97      */
       
    98     private static LongConsumer adapt(Sink<Long> sink) {
       
    99         if (sink instanceof LongConsumer) {
       
   100             return (LongConsumer) sink;
       
   101         } else {
       
   102             if (Tripwire.ENABLED)
       
   103                 Tripwire.trip(AbstractPipeline.class,
       
   104                               "using LongStream.adapt(Sink<Long> s)");
       
   105             return sink::accept;
       
   106         }
       
   107     }
       
   108 
       
   109     /**
       
   110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
       
   111      *
       
   112      * @implNote
       
   113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
       
   114      * an exception if this cast is not possible.
       
   115      */
       
   116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
       
   117         if (s instanceof Spliterator.OfLong) {
       
   118             return (Spliterator.OfLong) s;
       
   119         } else {
       
   120             if (Tripwire.ENABLED)
       
   121                 Tripwire.trip(AbstractPipeline.class,
       
   122                               "using LongStream.adapt(Spliterator<Long> s)");
       
   123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
       
   124         }
       
   125     }
       
   126 
       
   127 
       
   128     // Shape-specific methods
       
   129 
       
   130     @Override
       
   131     final StreamShape getOutputShape() {
       
   132         return StreamShape.LONG_VALUE;
       
   133     }
       
   134 
       
   135     @Override
       
   136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
       
   137                                            Spliterator<P_IN> spliterator,
       
   138                                            boolean flattenTree,
       
   139                                            IntFunction<Long[]> generator) {
       
   140         return Nodes.collectLong(helper, spliterator, flattenTree);
       
   141     }
       
   142 
       
   143     @Override
       
   144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
       
   145                                         Supplier<Spliterator<P_IN>> supplier,
       
   146                                         boolean isParallel) {
       
   147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
       
   148     }
       
   149 
       
   150     @Override
       
   151     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
       
   152         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
       
   153     }
       
   154 
       
   155     @Override
       
   156     final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
       
   157         Spliterator.OfLong spl = adapt(spliterator);
       
   158         LongConsumer adaptedSink =  adapt(sink);
       
   159         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
       
   160     }
       
   161 
       
   162     @Override
       
   163     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
       
   164         return Nodes.longBuilder(exactSizeIfKnown);
       
   165     }
       
   166 
       
   167 
       
   168     // LongStream
       
   169 
       
   170     @Override
       
   171     public final PrimitiveIterator.OfLong iterator() {
       
   172         return Spliterators.iteratorFromSpliterator(spliterator());
       
   173     }
       
   174 
       
   175     @Override
       
   176     public final Spliterator.OfLong spliterator() {
       
   177         return adapt(super.spliterator());
       
   178     }
       
   179 
       
   180     // Stateless intermediate ops from LongStream
       
   181 
       
   182     @Override
       
   183     public final DoubleStream doubles() {
       
   184         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   185                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
       
   186             @Override
       
   187             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
       
   188                 return new Sink.ChainedLong(sink) {
       
   189                     @Override
       
   190                     public void accept(long t) {
       
   191                         downstream.accept((double) t);
       
   192                     }
       
   193                 };
       
   194             }
       
   195         };
       
   196     }
       
   197 
       
   198     @Override
       
   199     public final Stream<Long> boxed() {
       
   200         return mapToObj(Long::valueOf);
       
   201     }
       
   202 
       
   203     @Override
       
   204     public final LongStream map(LongUnaryOperator mapper) {
       
   205         Objects.requireNonNull(mapper);
       
   206         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   207                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
       
   208             @Override
       
   209             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
       
   210                 return new Sink.ChainedLong(sink) {
       
   211                     @Override
       
   212                     public void accept(long t) {
       
   213                         downstream.accept(mapper.applyAsLong(t));
       
   214                     }
       
   215                 };
       
   216             }
       
   217         };
       
   218     }
       
   219 
       
   220     @Override
       
   221     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
       
   222         Objects.requireNonNull(mapper);
       
   223         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
       
   224                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
       
   225             @Override
       
   226             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
       
   227                 return new Sink.ChainedLong(sink) {
       
   228                     @Override
       
   229                     public void accept(long t) {
       
   230                         downstream.accept(mapper.apply(t));
       
   231                     }
       
   232                 };
       
   233             }
       
   234         };
       
   235     }
       
   236 
       
   237     @Override
       
   238     public final IntStream mapToInt(LongToIntFunction mapper) {
       
   239         Objects.requireNonNull(mapper);
       
   240         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   241                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
       
   242             @Override
       
   243             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
       
   244                 return new Sink.ChainedLong(sink) {
       
   245                     @Override
       
   246                     public void accept(long t) {
       
   247                         downstream.accept(mapper.applyAsInt(t));
       
   248                     }
       
   249                 };
       
   250             }
       
   251         };
       
   252     }
       
   253 
       
   254     @Override
       
   255     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
       
   256         Objects.requireNonNull(mapper);
       
   257         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   258                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
       
   259             @Override
       
   260             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
       
   261                 return new Sink.ChainedLong(sink) {
       
   262                     @Override
       
   263                     public void accept(long t) {
       
   264                         downstream.accept(mapper.applyAsDouble(t));
       
   265                     }
       
   266                 };
       
   267             }
       
   268         };
       
   269     }
       
   270 
       
   271     @Override
       
   272     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
       
   273         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   274                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
       
   275             @Override
       
   276             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
       
   277                 return new Sink.ChainedLong(sink) {
       
   278                     public void accept(long t) {
       
   279                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
       
   280                         LongStream result = mapper.apply(t);
       
   281                         if (result != null)
       
   282                             result.sequential().forEach(i -> downstream.accept(i));
       
   283                     }
       
   284                 };
       
   285             }
       
   286         };
       
   287     }
       
   288 
       
   289     @Override
       
   290     public LongStream unordered() {
       
   291         if (!isOrdered())
       
   292             return this;
       
   293         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
       
   294             @Override
       
   295             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
       
   296                 return sink;
       
   297             }
       
   298         };
       
   299     }
       
   300 
       
   301     @Override
       
   302     public final LongStream filter(LongPredicate predicate) {
       
   303         Objects.requireNonNull(predicate);
       
   304         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   305                                      StreamOpFlag.NOT_SIZED) {
       
   306             @Override
       
   307             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
       
   308                 return new Sink.ChainedLong(sink) {
       
   309                     @Override
       
   310                     public void accept(long t) {
       
   311                         if (predicate.test(t))
       
   312                             downstream.accept(t);
       
   313                     }
       
   314                 };
       
   315             }
       
   316         };
       
   317     }
       
   318 
       
   319     @Override
       
   320     public final LongStream peek(LongConsumer consumer) {
       
   321         Objects.requireNonNull(consumer);
       
   322         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
       
   323                                      0) {
       
   324             @Override
       
   325             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
       
   326                 return new Sink.ChainedLong(sink) {
       
   327                     @Override
       
   328                     public void accept(long t) {
       
   329                         consumer.accept(t);
       
   330                         downstream.accept(t);
       
   331                     }
       
   332                 };
       
   333             }
       
   334         };
       
   335     }
       
   336 
       
   337     // Stateful intermediate ops from LongStream
       
   338 
       
   339     private LongStream slice(long skip, long limit) {
       
   340         return SliceOps.makeLong(this, skip, limit);
       
   341     }
       
   342 
       
   343     @Override
       
   344     public final LongStream limit(long maxSize) {
       
   345         if (maxSize < 0)
       
   346             throw new IllegalArgumentException(Long.toString(maxSize));
       
   347         return slice(0, maxSize);
       
   348     }
       
   349 
       
   350     @Override
       
   351     public final LongStream substream(long startingOffset) {
       
   352         if (startingOffset < 0)
       
   353             throw new IllegalArgumentException(Long.toString(startingOffset));
       
   354         if (startingOffset == 0)
       
   355             return this;
       
   356         else
       
   357             return slice(startingOffset, -1);
       
   358     }
       
   359 
       
   360     @Override
       
   361     public final LongStream substream(long startingOffset, long endingOffset) {
       
   362         if (startingOffset < 0 || endingOffset < startingOffset)
       
   363             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
       
   364         return slice(startingOffset, endingOffset - startingOffset);
       
   365     }
       
   366 
       
   367     @Override
       
   368     public final LongStream sorted() {
       
   369         return SortedOps.makeLong(this);
       
   370     }
       
   371 
       
   372     @Override
       
   373     public final LongStream distinct() {
       
   374         // While functional and quick to implement, this approach is not very efficient.
       
   375         // An efficient version requires a long-specific map/set implementation.
       
   376         return boxed().distinct().mapToLong(i -> (long) i);
       
   377     }
       
   378 
       
   379     // Terminal ops from LongStream
       
   380 
       
   381     @Override
       
   382     public void forEach(LongConsumer action) {
       
   383         evaluate(ForEachOps.makeLong(action, false));
       
   384     }
       
   385 
       
   386     @Override
       
   387     public void forEachOrdered(LongConsumer action) {
       
   388         evaluate(ForEachOps.makeLong(action, true));
       
   389     }
       
   390 
       
   391     @Override
       
   392     public final long sum() {
       
   393         // use better algorithm to compensate for intermediate overflow?
       
   394         return reduce(0, Long::sum);
       
   395     }
       
   396 
       
   397     @Override
       
   398     public final OptionalLong min() {
       
   399         return reduce(Math::min);
       
   400     }
       
   401 
       
   402     @Override
       
   403     public final OptionalLong max() {
       
   404         return reduce(Math::max);
       
   405     }
       
   406 
       
   407     @Override
       
   408     public final OptionalDouble average() {
       
   409         long[] avg = collect(() -> new long[2],
       
   410                              (ll, i) -> {
       
   411                                  ll[0]++;
       
   412                                  ll[1] += i;
       
   413                              },
       
   414                              (ll, rr) -> {
       
   415                                  ll[0] += rr[0];
       
   416                                  ll[1] += rr[1];
       
   417                              });
       
   418         return avg[0] > 0
       
   419                ? OptionalDouble.of((double) avg[1] / avg[0])
       
   420                : OptionalDouble.empty();
       
   421     }
       
   422 
       
   423     @Override
       
   424     public final long count() {
       
   425         return map(e -> 1L).sum();
       
   426     }
       
   427 
       
   428     @Override
       
   429     public final LongSummaryStatistics summaryStatistics() {
       
   430         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
       
   431                        LongSummaryStatistics::combine);
       
   432     }
       
   433 
       
   434     @Override
       
   435     public final long reduce(long identity, LongBinaryOperator op) {
       
   436         return evaluate(ReduceOps.makeLong(identity, op));
       
   437     }
       
   438 
       
   439     @Override
       
   440     public final OptionalLong reduce(LongBinaryOperator op) {
       
   441         return evaluate(ReduceOps.makeLong(op));
       
   442     }
       
   443 
       
   444     @Override
       
   445     public final <R> R collect(Supplier<R> resultFactory,
       
   446                                ObjLongConsumer<R> accumulator,
       
   447                                BiConsumer<R, R> combiner) {
       
   448         BinaryOperator<R> operator = (left, right) -> {
       
   449             combiner.accept(left, right);
       
   450             return left;
       
   451         };
       
   452         return evaluate(ReduceOps.makeLong(resultFactory, accumulator, operator));
       
   453     }
       
   454 
       
   455     @Override
       
   456     public final boolean anyMatch(LongPredicate predicate) {
       
   457         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
       
   458     }
       
   459 
       
   460     @Override
       
   461     public final boolean allMatch(LongPredicate predicate) {
       
   462         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
       
   463     }
       
   464 
       
   465     @Override
       
   466     public final boolean noneMatch(LongPredicate predicate) {
       
   467         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
       
   468     }
       
   469 
       
   470     @Override
       
   471     public final OptionalLong findFirst() {
       
   472         return evaluate(FindOps.makeLong(true));
       
   473     }
       
   474 
       
   475     @Override
       
   476     public final OptionalLong findAny() {
       
   477         return evaluate(FindOps.makeLong(false));
       
   478     }
       
   479 
       
   480     @Override
       
   481     public final long[] toArray() {
       
   482         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new)).asLongArray();
       
   483     }
       
   484 
       
   485 
       
   486     //
       
   487 
       
   488     /**
       
   489      * Source stage of a LongPipeline.
       
   490      *
       
   491      * @param <E_IN> type of elements in the upstream source
       
   492      * @since 1.8
       
   493      */
       
   494     static class Head<E_IN> extends LongPipeline<E_IN> {
       
   495         /**
       
   496          * Constructor for the source stage of a LongStream.
       
   497          *
       
   498          * @param source {@code Supplier<Spliterator>} describing the stream
       
   499          *               source
       
   500          * @param sourceFlags the source flags for the stream source, described
       
   501          *                    in {@link StreamOpFlag}
       
   502          * @param parallel {@code true} if the pipeline is parallel
       
   503          */
       
   504         Head(Supplier<? extends Spliterator<Long>> source,
       
   505              int sourceFlags, boolean parallel) {
       
   506             super(source, sourceFlags, parallel);
       
   507         }
       
   508 
       
   509         /**
       
   510          * Constructor for the source stage of a LongStream.
       
   511          *
       
   512          * @param source {@code Spliterator} describing the stream source
       
   513          * @param sourceFlags the source flags for the stream source, described
       
   514          *                    in {@link StreamOpFlag}
       
   515          * @param parallel {@code true} if the pipeline is parallel
       
   516          */
       
   517         Head(Spliterator<Long> source,
       
   518              int sourceFlags, boolean parallel) {
       
   519             super(source, sourceFlags, parallel);
       
   520         }
       
   521 
       
   522         @Override
       
   523         final boolean opIsStateful() {
       
   524             throw new UnsupportedOperationException();
       
   525         }
       
   526 
       
   527         @Override
       
   528         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
       
   529             throw new UnsupportedOperationException();
       
   530         }
       
   531 
       
   532         // Optimized sequential terminal operations for the head of the pipeline
       
   533 
       
   534         @Override
       
   535         public void forEach(LongConsumer action) {
       
   536             if (!isParallel()) {
       
   537                 adapt(sourceStageSpliterator()).forEachRemaining(action);
       
   538             } else {
       
   539                 super.forEach(action);
       
   540             }
       
   541         }
       
   542 
       
   543         @Override
       
   544         public void forEachOrdered(LongConsumer action) {
       
   545             if (!isParallel()) {
       
   546                 adapt(sourceStageSpliterator()).forEachRemaining(action);
       
   547             } else {
       
   548                 super.forEachOrdered(action);
       
   549             }
       
   550         }
       
   551     }
       
   552 
       
   553     /** Base class for a stateless intermediate stage of a LongStream.
       
   554      *
       
   555      * @param <E_IN> type of elements in the upstream source
       
   556      * @since 1.8
       
   557      */
       
   558     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
       
   559         /**
       
   560          * Construct a new LongStream by appending a stateless intermediate
       
   561          * operation to an existing stream.
       
   562          * @param upstream The upstream pipeline stage
       
   563          * @param inputShape The stream shape for the upstream pipeline stage
       
   564          * @param opFlags Operation flags for the new stage
       
   565          */
       
   566         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
       
   567                     StreamShape inputShape,
       
   568                     int opFlags) {
       
   569             super(upstream, opFlags);
       
   570             assert upstream.getOutputShape() == inputShape;
       
   571         }
       
   572 
       
   573         @Override
       
   574         final boolean opIsStateful() {
       
   575             return false;
       
   576         }
       
   577     }
       
   578 
       
   579     /**
       
   580      * Base class for a stateful intermediate stage of a LongStream.
       
   581      *
       
   582      * @param <E_IN> type of elements in the upstream source
       
   583      * @since 1.8
       
   584      */
       
   585     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
       
   586         /**
       
   587          * Construct a new LongStream by appending a stateful intermediate
       
   588          * operation to an existing stream.
       
   589          * @param upstream The upstream pipeline stage
       
   590          * @param inputShape The stream shape for the upstream pipeline stage
       
   591          * @param opFlags Operation flags for the new stage
       
   592          */
       
   593         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
       
   594                    StreamShape inputShape,
       
   595                    int opFlags) {
       
   596             super(upstream, opFlags);
       
   597             assert upstream.getOutputShape() == inputShape;
       
   598         }
       
   599 
       
   600         @Override
       
   601         final boolean opIsStateful() {
       
   602             return true;
       
   603         }
       
   604 
       
   605         @Override
       
   606         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
       
   607                                                       Spliterator<P_IN> spliterator,
       
   608                                                       IntFunction<Long[]> generator);
       
   609     }
       
   610 }