17167
|
1 |
/*
|
|
2 |
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
4 |
*
|
|
5 |
* This code is free software; you can redistribute it and/or modify it
|
|
6 |
* under the terms of the GNU General Public License version 2 only, as
|
|
7 |
* published by the Free Software Foundation. Oracle designates this
|
|
8 |
* particular file as subject to the "Classpath" exception as provided
|
|
9 |
* by Oracle in the LICENSE file that accompanied this code.
|
|
10 |
*
|
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that
|
|
15 |
* accompanied this code).
|
|
16 |
*
|
|
17 |
* You should have received a copy of the GNU General Public License version
|
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
20 |
*
|
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
22 |
* or visit www.oracle.com if you need additional information or have any
|
|
23 |
* questions.
|
|
24 |
*/
|
|
25 |
|
|
26 |
/**
|
|
27 |
* <h1>java.util.stream</h1>
|
|
28 |
*
|
|
29 |
* Classes to support functional-style operations on streams of values, as in the following:
|
|
30 |
*
|
|
31 |
* <pre>{@code
|
|
32 |
* int sumOfWeights = blocks.stream().filter(b -> b.getColor() == RED)
|
|
33 |
* .mapToInt(b -> b.getWeight())
|
|
34 |
* .sum();
|
|
35 |
* }</pre>
|
|
36 |
*
|
|
37 |
* <p>Here we use {@code blocks}, which might be a {@code Collection}, as a source for a stream,
|
|
38 |
* and then perform a filter-map-reduce ({@code sum()} is an example of a <a href="package-summary.html#Reduction">reduction</a>
|
|
39 |
* operation) on the stream to obtain the sum of the weights of the red blocks.
|
|
40 |
*
|
|
41 |
* <p>The key abstraction used in this approach is {@link java.util.stream.Stream}, as well as its primitive
|
|
42 |
* specializations {@link java.util.stream.IntStream}, {@link java.util.stream.LongStream},
|
|
43 |
* and {@link java.util.stream.DoubleStream}. Streams differ from Collections in several ways:
|
|
44 |
*
|
|
45 |
* <ul>
|
|
46 |
* <li>No storage. A stream is not a data structure that stores elements; instead, they
|
|
47 |
* carry values from a source (which could be a data structure, a generator, an IO channel, etc)
|
|
48 |
* through a pipeline of computational operations.</li>
|
|
49 |
* <li>Functional in nature. An operation on a stream produces a result, but does not modify
|
|
50 |
* its underlying data source. For example, filtering a {@code Stream} produces a new {@code Stream},
|
|
51 |
* rather than removing elements from the underlying source.</li>
|
|
52 |
* <li>Laziness-seeking. Many stream operations, such as filtering, mapping, or duplicate removal,
|
|
53 |
* can be implemented lazily, exposing opportunities for optimization. (For example, "find the first
|
|
54 |
* {@code String} matching a pattern" need not examine all the input strings.) Stream operations
|
|
55 |
* are divided into intermediate ({@code Stream}-producing) operations and terminal (value-producing)
|
|
56 |
* operations; all intermediate operations are lazy.</li>
|
|
57 |
* <li>Possibly unbounded. While collections have a finite size, streams need not. Operations
|
|
58 |
* such as {@code limit(n)} or {@code findFirst()} can allow computations on infinite streams
|
|
59 |
* to complete in finite time.</li>
|
|
60 |
* </ul>
|
|
61 |
*
|
|
62 |
* <h2><a name="StreamPipelines">Stream pipelines</a></h2>
|
|
63 |
*
|
|
64 |
* <p>Streams are used to create <em>pipelines</em> of <a href="package-summary.html#StreamOps">operations</a>. A
|
|
65 |
* complete stream pipeline has several components: a source (which may be a {@code Collection},
|
|
66 |
* an array, a generator function, or an IO channel); zero or more <em>intermediate operations</em>
|
|
67 |
* such as {@code Stream.filter} or {@code Stream.map}; and a <em>terminal operation</em> such
|
|
68 |
* as {@code Stream.forEach} or {@code java.util.stream.Stream.reduce}. Stream operations may take as parameters
|
|
69 |
* <em>function values</em> (which are often lambda expressions, but could be method references
|
|
70 |
* or objects) which parameterize the behavior of the operation, such as a {@code Predicate}
|
|
71 |
* passed to the {@code Stream#filter} method.
|
|
72 |
*
|
|
73 |
* <p>Intermediate operations return a new {@code Stream}. They are lazy; executing an
|
|
74 |
* intermediate operation such as {@link java.util.stream.Stream#filter Stream.filter} does
|
|
75 |
* not actually perform any filtering, instead creating a new {@code Stream} that, when
|
|
76 |
* traversed, contains the elements of the initial {@code Stream} that match the
|
|
77 |
* given {@code Predicate}. Consuming elements from the stream source does not
|
|
78 |
* begin until the terminal operation is executed.
|
|
79 |
*
|
|
80 |
* <p>Terminal operations consume the {@code Stream} and produce a result or a side-effect.
|
|
81 |
* After a terminal operation is performed, the stream can no longer be used and you must
|
|
82 |
* return to the data source, or select a new data source, to get a new stream. For example,
|
|
83 |
* obtaining the sum of weights of all red blocks, and then of all blue blocks, requires a
|
|
84 |
* filter-map-reduce on two different streams:
|
|
85 |
* <pre>{@code
|
|
86 |
* int sumOfRedWeights = blocks.stream().filter(b -> b.getColor() == RED)
|
|
87 |
* .mapToInt(b -> b.getWeight())
|
|
88 |
* .sum();
|
|
89 |
* int sumOfBlueWeights = blocks.stream().filter(b -> b.getColor() == BLUE)
|
|
90 |
* .mapToInt(b -> b.getWeight())
|
|
91 |
* .sum();
|
|
92 |
* }</pre>
|
|
93 |
*
|
|
94 |
* <p>However, there are other techniques that allow you to obtain both results in a single
|
|
95 |
* pass if multiple traversal is impractical or inefficient. TODO provide link
|
|
96 |
*
|
|
97 |
* <h3><a name="StreamOps">Stream operations</a></h3>
|
|
98 |
*
|
|
99 |
* <p>Intermediate stream operation (such as {@code filter} or {@code sorted}) always produce a
|
|
100 |
* new {@code Stream}, and are always<em>lazy</em>. Executing a lazy operations does not
|
|
101 |
* trigger processing of the stream contents; all processing is deferred until the terminal
|
|
102 |
* operation commences. Processing streams lazily allows for significant efficiencies; in a
|
|
103 |
* pipeline such as the filter-map-sum example above, filtering, mapping, and addition can be
|
|
104 |
* fused into a single pass, with minimal intermediate state. Laziness also enables us to avoid
|
|
105 |
* examining all the data when it is not necessary; for operations such as "find the first
|
|
106 |
* string longer than 1000 characters", one need not examine all the input strings, just enough
|
|
107 |
* to find one that has the desired characteristics. (This behavior becomes even more important
|
|
108 |
* when the input stream is infinite and not merely large.)
|
|
109 |
*
|
|
110 |
* <p>Intermediate operations are further divided into <em>stateless</em> and <em>stateful</em>
|
|
111 |
* operations. Stateless operations retain no state from previously seen values when processing
|
|
112 |
* a new value; examples of stateless intermediate operations include {@code filter} and
|
|
113 |
* {@code map}. Stateful operations may incorporate state from previously seen elements in
|
|
114 |
* processing new values; examples of stateful intermediate operations include {@code distinct}
|
|
115 |
* and {@code sorted}. Stateful operations may need to process the entire input before
|
|
116 |
* producing a result; for example, one cannot produce any results from sorting a stream until
|
|
117 |
* one has seen all elements of the stream. As a result, under parallel computation, some
|
|
118 |
* pipelines containing stateful intermediate operations have to be executed in multiple passes.
|
|
119 |
* Pipelines containing exclusively stateless intermediate operations can be processed in a
|
|
120 |
* single pass, whether sequential or parallel.
|
|
121 |
*
|
|
122 |
* <p>Further, some operations are deemed <em>short-circuiting</em> operations. An intermediate
|
|
123 |
* operation is short-circuiting if, when presented with infinite input, it may produce a
|
|
124 |
* finite stream as a result. A terminal operation is short-circuiting if, when presented with
|
|
125 |
* infinite input, it may terminate in finite time. (Having a short-circuiting operation is a
|
|
126 |
* necessary, but not sufficient, condition for the processing of an infinite stream to
|
|
127 |
* terminate normally in finite time.)
|
|
128 |
*
|
|
129 |
* Terminal operations (such as {@code forEach} or {@code findFirst}) are always eager
|
|
130 |
* (they execute completely before returning), and produce a non-{@code Stream} result, such
|
|
131 |
* as a primitive value or a {@code Collection}, or have side-effects.
|
|
132 |
*
|
|
133 |
* <h3>Parallelism</h3>
|
|
134 |
*
|
|
135 |
* <p>By recasting aggregate operations as a pipeline of operations on a stream of values, many
|
|
136 |
* aggregate operations can be more easily parallelized. A {@code Stream} can execute either
|
|
137 |
* in serial or in parallel. When streams are created, they are either created as sequential
|
|
138 |
* or parallel streams; the parallel-ness of streams can also be switched by the
|
|
139 |
* {@link java.util.stream Stream#sequential()} and {@link java.util.stream.Stream#parallel()}
|
|
140 |
* operations. The {@code Stream} implementations in the JDK create serial streams unless
|
|
141 |
* parallelism is explicitly requested. For example, {@code Collection} has methods
|
|
142 |
* {@link java.util.Collection#stream} and {@link java.util.Collection#parallelStream},
|
|
143 |
* which produce sequential and parallel streams respectively; other stream-bearing methods
|
|
144 |
* such as {@link java.util.stream.Streams#intRange(int, int)} produce sequential
|
|
145 |
* streams but these can be efficiently parallelized by calling {@code parallel()} on the
|
|
146 |
* result. The set of operations on serial and parallel streams is identical. To execute the
|
|
147 |
* "sum of weights of blocks" query in parallel, we would do:
|
|
148 |
*
|
|
149 |
* <pre>{@code
|
|
150 |
* int sumOfWeights = blocks.parallelStream().filter(b -> b.getColor() == RED)
|
|
151 |
* .mapToInt(b -> b.getWeight())
|
|
152 |
* .sum();
|
|
153 |
* }</pre>
|
|
154 |
*
|
|
155 |
* <p>The only difference between the serial and parallel versions of this example code is
|
|
156 |
* the creation of the initial {@code Stream}. Whether a {@code Stream} will execute in serial
|
|
157 |
* or parallel can be determined by the {@code Stream#isParallel} method. When the terminal
|
|
158 |
* operation is initiated, the entire stream pipeline is either executed sequentially or in
|
|
159 |
* parallel, determined by the last operation that affected the stream's serial-parallel
|
|
160 |
* orientation (which could be the stream source, or the {@code sequential()} or
|
|
161 |
* {@code parallel()} methods.)
|
|
162 |
*
|
|
163 |
* <p>In order for the results of parallel operations to be deterministic and consistent with
|
|
164 |
* their serial equivalent, the function values passed into the various stream operations should
|
|
165 |
* be <a href="#NonInteference"><em>stateless</em></a>.
|
|
166 |
*
|
|
167 |
* <h3><a name="Ordering">Ordering</a></h3>
|
|
168 |
*
|
|
169 |
* <p>Streams may or may not have an <em>encounter order</em>. An encounter
|
|
170 |
* order specifies the order in which elements are provided by the stream to the
|
|
171 |
* operations pipeline. Whether or not there is an encounter order depends on
|
|
172 |
* the source, the intermediate operations, and the terminal operation.
|
|
173 |
* Certain stream sources (such as {@code List} or arrays) are intrinsically
|
|
174 |
* ordered, whereas others (such as {@code HashSet}) are not. Some intermediate
|
|
175 |
* operations may impose an encounter order on an otherwise unordered stream,
|
|
176 |
* such as {@link java.util.stream.Stream#sorted()}, and others may render an
|
|
177 |
* ordered stream unordered (such as {@link java.util.stream.Stream#unordered()}).
|
|
178 |
* Some terminal operations may ignore encounter order, such as
|
|
179 |
* {@link java.util.stream.Stream#forEach}.
|
|
180 |
*
|
|
181 |
* <p>If a Stream is ordered, most operations are constrained to operate on the
|
|
182 |
* elements in their encounter order; if the source of a stream is a {@code List}
|
|
183 |
* containing {@code [1, 2, 3]}, then the result of executing {@code map(x -> x*2)}
|
|
184 |
* must be {@code [2, 4, 6]}. However, if the source has no defined encounter
|
|
185 |
* order, than any of the six permutations of the values {@code [2, 4, 6]} would
|
|
186 |
* be a valid result. Many operations can still be efficiently parallelized even
|
|
187 |
* under ordering constraints.
|
|
188 |
*
|
|
189 |
* <p>For sequential streams, ordering is only relevant to the determinism
|
|
190 |
* of operations performed repeatedly on the same source. (An {@code ArrayList}
|
|
191 |
* is constrained to iterate elements in order; a {@code HashSet} is not, and
|
|
192 |
* repeated iteration might produce a different order.)
|
|
193 |
*
|
|
194 |
* <p>For parallel streams, relaxing the ordering constraint can enable
|
|
195 |
* optimized implementation for some operations. For example, duplicate
|
|
196 |
* filtration on an ordered stream must completely process the first partition
|
|
197 |
* before it can return any elements from a subsequent partition, even if those
|
|
198 |
* elements are available earlier. On the other hand, without the constraint of
|
|
199 |
* ordering, duplicate filtration can be done more efficiently by using
|
|
200 |
* a shared {@code ConcurrentHashSet}. There will be cases where the stream
|
|
201 |
* is structurally ordered (the source is ordered and the intermediate
|
|
202 |
* operations are order-preserving), but the user does not particularly care
|
|
203 |
* about the encounter order. In some cases, explicitly de-ordering the stream
|
|
204 |
* with the {@link java.util.stream.Stream#unordered()} method may result in
|
|
205 |
* improved parallel performance for some stateful or terminal operations.
|
|
206 |
*
|
|
207 |
* <h3><a name="Non-Interference">Non-interference</a></h3>
|
|
208 |
*
|
|
209 |
* The {@code java.util.stream} package enables you to execute possibly-parallel
|
|
210 |
* bulk-data operations over a variety of data sources, including even non-thread-safe
|
|
211 |
* collections such as {@code ArrayList}. This is possible only if we can
|
|
212 |
* prevent <em>interference</em> with the data source during the execution of a
|
|
213 |
* stream pipeline. (Execution begins when the terminal operation is invoked, and ends
|
|
214 |
* when the terminal operation completes.) For most data sources, preventing interference
|
|
215 |
* means ensuring that the data source is <em>not modified at all</em> during the execution
|
|
216 |
* of the stream pipeline. (Some data sources, such as concurrent collections, are
|
|
217 |
* specifically designed to handle concurrent modification.)
|
|
218 |
*
|
|
219 |
* <p>Accordingly, lambda expressions (or other objects implementing the appropriate functional
|
|
220 |
* interface) passed to stream methods should never modify the stream's data source. An
|
|
221 |
* implementation is said to <em>interfere</em> with the data source if it modifies, or causes
|
|
222 |
* to be modified, the stream's data source. The need for non-interference applies to all
|
|
223 |
* pipelines, not just parallel ones. Unless the stream source is concurrent, modifying a
|
|
224 |
* stream's data source during execution of a stream pipeline can cause exceptions, incorrect
|
|
225 |
* answers, or nonconformant results.
|
|
226 |
*
|
|
227 |
* <p>Further, results may be nondeterministic or incorrect if the lambda expressions passed to
|
|
228 |
* stream operations are <em>stateful</em>. A stateful lambda (or other object implementing the
|
|
229 |
* appropriate functional interface) is one whose result depends on any state which might change
|
|
230 |
* during the execution of the stream pipeline. An example of a stateful lambda is:
|
|
231 |
* <pre>{@code
|
|
232 |
* Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
|
|
233 |
* stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
|
|
234 |
* }</pre>
|
|
235 |
* Here, if the mapping operation is performed in parallel, the results for the same input
|
|
236 |
* could vary from run to run, due to thread scheduling differences, whereas, with a stateless
|
|
237 |
* lambda expression the results would always be the same.
|
|
238 |
*
|
|
239 |
* <h3>Side-effects</h3>
|
|
240 |
*
|
|
241 |
* <h2><a name="Reduction">Reduction operations</a></h2>
|
|
242 |
*
|
|
243 |
* A <em>reduction</em> operation takes a stream of elements and processes them in a way
|
|
244 |
* that reduces to a single value or summary description, such as finding the sum or maximum
|
|
245 |
* of a set of numbers. (In more complex scenarios, the reduction operation might need to
|
|
246 |
* extract data from the elements before reducing that data to a single value, such as
|
|
247 |
* finding the sum of weights of a set of blocks. This would require extracting the weight
|
|
248 |
* from each block before summing up the weights.)
|
|
249 |
*
|
|
250 |
* <p>Of course, such operations can be readily implemented as simple sequential loops, as in:
|
|
251 |
* <pre>{@code
|
|
252 |
* int sum = 0;
|
|
253 |
* for (int x : numbers) {
|
|
254 |
* sum += x;
|
|
255 |
* }
|
|
256 |
* }</pre>
|
|
257 |
* However, there may be a significant advantage to preferring a {@link java.util.stream.Stream#reduce reduce operation}
|
|
258 |
* over a mutative accumulation such as the above -- a properly constructed reduce operation is
|
|
259 |
* inherently parallelizable so long as the
|
|
260 |
* {@link java.util.function.BinaryOperator reduction operaterator}
|
|
261 |
* has the right characteristics. Specifically the operator must be
|
|
262 |
* <a href="#Associativity">associative</a>. For example, given a
|
|
263 |
* stream of numbers for which we want to find the sum, we can write:
|
|
264 |
* <pre>{@code
|
|
265 |
* int sum = numbers.reduce(0, (x,y) -> x+y);
|
|
266 |
* }</pre>
|
|
267 |
* or more succinctly:
|
|
268 |
* <pre>{@code
|
|
269 |
* int sum = numbers.reduce(0, Integer::sum);
|
|
270 |
* }</pre>
|
|
271 |
*
|
|
272 |
* <p>(The primitive specializations of {@link java.util.stream.Stream}, such as
|
|
273 |
* {@link java.util.stream.IntStream}, even have convenience methods for common reductions,
|
|
274 |
* such as {@link java.util.stream.IntStream#sum() sum} and {@link java.util.stream.IntStream#max() max},
|
|
275 |
* which are implemented as simple wrappers around reduce.)
|
|
276 |
*
|
|
277 |
* <p>Reduction parallellizes well since the implementation of {@code reduce} can operate on
|
|
278 |
* subsets of the stream in parallel, and then combine the intermediate results to get the final
|
|
279 |
* correct answer. Even if you were to use a parallelizable form of the
|
|
280 |
* {@link java.util.stream.Stream#forEach(Consumer) forEach()} method
|
|
281 |
* in place of the original for-each loop above, you would still have to provide thread-safe
|
|
282 |
* updates to the shared accumulating variable {@code sum}, and the required synchronization
|
|
283 |
* would likely eliminate any performance gain from parallelism. Using a {@code reduce} method
|
|
284 |
* instead removes all of the burden of parallelizing the reduction operation, and the library
|
|
285 |
* can provide an efficient parallel implementation with no additional synchronization needed.
|
|
286 |
*
|
|
287 |
* <p>The "blocks" examples shown earlier shows how reduction combines with other operations
|
|
288 |
* to replace for loops with bulk operations. If {@code blocks} is a collection of {@code Block}
|
|
289 |
* objects, which have a {@code getWeight} method, we can find the heaviest block with:
|
|
290 |
* <pre>{@code
|
|
291 |
* OptionalInt heaviest = blocks.stream()
|
|
292 |
* .mapToInt(Block::getWeight)
|
|
293 |
* .reduce(Integer::max);
|
|
294 |
* }</pre>
|
|
295 |
*
|
|
296 |
* <p>In its more general form, a {@code reduce} operation on elements of type {@code <T>}
|
|
297 |
* yielding a result of type {@code <U>} requires three parameters:
|
|
298 |
* <pre>{@code
|
|
299 |
* <U> U reduce(U identity,
|
|
300 |
* BiFunction<U, ? super T, U> accumlator,
|
|
301 |
* BinaryOperator<U> combiner);
|
|
302 |
* }</pre>
|
|
303 |
* Here, the <em>identity</em> element is both an initial seed for the reduction, and a default
|
|
304 |
* result if there are no elements. The <em>accumulator</em> function takes a partial result and
|
|
305 |
* the next element, and produce a new partial result. The <em>combiner</em> function combines
|
|
306 |
* the partial results of two accumulators to produce a new partial result, and eventually the
|
|
307 |
* final result.
|
|
308 |
*
|
|
309 |
* <p>This form is a generalization of the two-argument form, and is also a generalization of
|
|
310 |
* the map-reduce construct illustrated above. If we wanted to re-cast the simple {@code sum}
|
|
311 |
* example using the more general form, {@code 0} would be the identity element, while
|
|
312 |
* {@code Integer::sum} would be both the accumulator and combiner. For the sum-of-weights
|
|
313 |
* example, this could be re-cast as:
|
|
314 |
* <pre>{@code
|
|
315 |
* int sumOfWeights = blocks.stream().reduce(0,
|
|
316 |
* (sum, b) -> sum + b.getWeight())
|
|
317 |
* Integer::sum);
|
|
318 |
* }</pre>
|
|
319 |
* though the map-reduce form is more readable and generally preferable. The generalized form
|
|
320 |
* is provided for cases where significant work can be optimized away by combining mapping and
|
|
321 |
* reducing into a single function.
|
|
322 |
*
|
|
323 |
* <p>More formally, the {@code identity} value must be an <em>identity</em> for the combiner
|
|
324 |
* function. This means that for all {@code u}, {@code combiner.apply(identity, u)} is equal
|
|
325 |
* to {@code u}. Additionally, the {@code combiner} function must be
|
|
326 |
* <a href="#Associativity">associative</a> and must be compatible with the {@code accumulator}
|
|
327 |
* function; for all {@code u} and {@code t}, the following must hold:
|
|
328 |
* <pre>{@code
|
|
329 |
* combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
|
|
330 |
* }</pre>
|
|
331 |
*
|
|
332 |
* <h3><a name="MutableReduction">Mutable Reduction</a></h3>
|
|
333 |
*
|
|
334 |
* A <em>mutable</em> reduction operation is similar to an ordinary reduction, in that it reduces
|
|
335 |
* a stream of values to a single value, but instead of producing a distinct single-valued result, it
|
|
336 |
* mutates a general <em>result container</em>, such as a {@code Collection} or {@code StringBuilder},
|
|
337 |
* as it processes the elements in the stream.
|
|
338 |
*
|
|
339 |
* <p>For example, if we wanted to take a stream of strings and concatenate them into a single
|
|
340 |
* long string, we <em>could</em> achieve this with ordinary reduction:
|
|
341 |
* <pre>{@code
|
|
342 |
* String concatenated = strings.reduce("", String::concat)
|
|
343 |
* }</pre>
|
|
344 |
*
|
|
345 |
* We would get the desired result, and it would even work in parallel. However, we might not
|
|
346 |
* be happy about the performance! Such an implementation would do a great deal of string
|
|
347 |
* copying, and the run time would be <em>O(n^2)</em> in the number of elements. A more
|
|
348 |
* performant approach would be to accumulate the results into a {@link java.lang.StringBuilder}, which
|
|
349 |
* is a mutable container for accumulating strings. We can use the same technique to
|
|
350 |
* parallelize mutable reduction as we do with ordinary reduction.
|
|
351 |
*
|
|
352 |
* <p>The mutable reduction operation is called {@link java.util.stream.Stream#collect(Collector) collect()}, as it
|
|
353 |
* collects together the desired results into a result container such as {@code StringBuilder}.
|
|
354 |
* A {@code collect} operation requires three things: a factory function which will construct
|
|
355 |
* new instances of the result container, an accumulating function that will update a result
|
|
356 |
* container by incorporating a new element, and a combining function that can take two
|
|
357 |
* result containers and merge their contents. The form of this is very similar to the general
|
|
358 |
* form of ordinary reduction:
|
|
359 |
* <pre>{@code
|
|
360 |
* <R> R collect(Supplier<R> resultFactory,
|
|
361 |
* BiConsumer<R, ? super T> accumulator,
|
|
362 |
* BiConsumer<R, R> combiner);
|
|
363 |
* }</pre>
|
|
364 |
* As with {@code reduce()}, the benefit of expressing {@code collect} in this abstract way is
|
|
365 |
* that it is directly amenable to parallelization: we can accumulate partial results in parallel
|
|
366 |
* and then combine them. For example, to collect the String representations of the elements
|
|
367 |
* in a stream into an {@code ArrayList}, we could write the obvious sequential for-each form:
|
|
368 |
* <pre>{@code
|
|
369 |
* ArrayList<String> strings = new ArrayList<>();
|
|
370 |
* for (T element : stream) {
|
|
371 |
* strings.add(element.toString());
|
|
372 |
* }
|
|
373 |
* }</pre>
|
|
374 |
* Or we could use a parallelizable collect form:
|
|
375 |
* <pre>{@code
|
|
376 |
* ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
|
|
377 |
* (c, e) -> c.add(e.toString()),
|
|
378 |
* (c1, c2) -> c1.addAll(c2));
|
|
379 |
* }</pre>
|
|
380 |
* or, noting that we have buried a mapping operation inside the accumulator function, more
|
|
381 |
* succinctly as:
|
|
382 |
* <pre>{@code
|
|
383 |
* ArrayList<String> strings = stream.map(Object::toString)
|
|
384 |
* .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
|
|
385 |
* }</pre>
|
|
386 |
* Here, our supplier is just the {@link java.util.ArrayList#ArrayList() ArrayList constructor}, the
|
|
387 |
* accumulator adds the stringified element to an {@code ArrayList}, and the combiner simply
|
|
388 |
* uses {@link java.util.ArrayList#addAll addAll} to copy the strings from one container into the other.
|
|
389 |
*
|
|
390 |
* <p>As with the regular reduction operation, the ability to parallelize only comes if an
|
|
391 |
* <a href="package-summary.html#Associativity">associativity</a> condition is met. The {@code combiner} is associative
|
|
392 |
* if for result containers {@code r1}, {@code r2}, and {@code r3}:
|
|
393 |
* <pre>{@code
|
|
394 |
* combiner.accept(r1, r2);
|
|
395 |
* combiner.accept(r1, r3);
|
|
396 |
* }</pre>
|
|
397 |
* is equivalent to
|
|
398 |
* <pre>{@code
|
|
399 |
* combiner.accept(r2, r3);
|
|
400 |
* combiner.accept(r1, r2);
|
|
401 |
* }</pre>
|
|
402 |
* where equivalence means that {@code r1} is left in the same state (according to the meaning
|
|
403 |
* of {@link java.lang.Object#equals equals} for the element types). Similarly, the {@code resultFactory}
|
|
404 |
* must act as an <em>identity</em> with respect to the {@code combiner} so that for any result
|
|
405 |
* container {@code r}:
|
|
406 |
* <pre>{@code
|
|
407 |
* combiner.accept(r, resultFactory.get());
|
|
408 |
* }</pre>
|
|
409 |
* does not modify the state of {@code r} (again according to the meaning of
|
|
410 |
* {@link java.lang.Object#equals equals}). Finally, the {@code accumulator} and {@code combiner} must be
|
|
411 |
* compatible such that for a result container {@code r} and element {@code t}:
|
|
412 |
* <pre>{@code
|
|
413 |
* r2 = resultFactory.get();
|
|
414 |
* accumulator.accept(r2, t);
|
|
415 |
* combiner.accept(r, r2);
|
|
416 |
* }</pre>
|
|
417 |
* is equivalent to:
|
|
418 |
* <pre>{@code
|
|
419 |
* accumulator.accept(r,t);
|
|
420 |
* }</pre>
|
|
421 |
* where equivalence means that {@code r} is left in the same state (again according to the
|
|
422 |
* meaning of {@link java.lang.Object#equals equals}).
|
|
423 |
*
|
|
424 |
* <p> The three aspects of {@code collect}: supplier, accumulator, and combiner, are often very
|
|
425 |
* tightly coupled, and it is convenient to introduce the notion of a {@link java.util.stream.Collector} as
|
|
426 |
* being an object that embodies all three aspects. There is a {@link java.util.stream.Stream#collect(Collector) collect}
|
|
427 |
* method that simply takes a {@code Collector} and returns the resulting container.
|
|
428 |
* The above example for collecting strings into a {@code List} can be rewritten using a
|
|
429 |
* standard {@code Collector} as:
|
|
430 |
* <pre>{@code
|
|
431 |
* ArrayList<String> strings = stream.map(Object::toString)
|
|
432 |
* .collect(Collectors.toList());
|
|
433 |
* }</pre>
|
|
434 |
*
|
|
435 |
* <h3><a name="ConcurrentReduction">Reduction, Concurrency, and Ordering</a></h3>
|
|
436 |
*
|
|
437 |
* With some complex reduction operations, for example a collect that produces a
|
|
438 |
* {@code Map}, such as:
|
|
439 |
* <pre>{@code
|
|
440 |
* Map<Buyer, List<Transaction>> salesByBuyer
|
|
441 |
* = txns.parallelStream()
|
|
442 |
* .collect(Collectors.groupingBy(Transaction::getBuyer));
|
|
443 |
* }</pre>
|
|
444 |
* (where {@link java.util.stream.Collectors#groupingBy} is a utility function
|
|
445 |
* that returns a {@link java.util.stream.Collector} for grouping sets of elements based on some key)
|
|
446 |
* it may actually be counterproductive to perform the operation in parallel.
|
|
447 |
* This is because the combining step (merging one {@code Map} into another by key)
|
|
448 |
* can be expensive for some {@code Map} implementations.
|
|
449 |
*
|
|
450 |
* <p>Suppose, however, that the result container used in this reduction
|
|
451 |
* was a concurrently modifiable collection -- such as a
|
|
452 |
* {@link java.util.concurrent.ConcurrentHashMap ConcurrentHashMap}. In that case,
|
|
453 |
* the parallel invocations of the accumulator could actually deposit their results
|
|
454 |
* concurrently into the same shared result container, eliminating the need for the combiner to
|
|
455 |
* merge distinct result containers. This potentially provides a boost
|
|
456 |
* to the parallel execution performance. We call this a <em>concurrent</em> reduction.
|
|
457 |
*
|
|
458 |
* <p>A {@link java.util.stream.Collector} that supports concurrent reduction is marked with the
|
|
459 |
* {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic.
|
|
460 |
* Having a concurrent collector is a necessary condition for performing a
|
|
461 |
* concurrent reduction, but that alone is not sufficient. If you imagine multiple
|
|
462 |
* accumulators depositing results into a shared container, the order in which
|
|
463 |
* results are deposited is non-deterministic. Consequently, a concurrent reduction
|
|
464 |
* is only possible if ordering is not important for the stream being processed.
|
|
465 |
* The {@link java.util.stream.Stream#collect(Collector)}
|
|
466 |
* implementation will only perform a concurrent reduction if
|
|
467 |
* <ul>
|
|
468 |
* <li>The stream is parallel;</li>
|
|
469 |
* <li>The collector has the
|
|
470 |
* {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic,
|
|
471 |
* and;</li>
|
|
472 |
* <li>Either the stream is unordered, or the collector has the
|
|
473 |
* {@link java.util.stream.Collector.Characteristics#UNORDERED} characteristic.
|
|
474 |
* </ul>
|
|
475 |
* For example:
|
|
476 |
* <pre>{@code
|
|
477 |
* Map<Buyer, List<Transaction>> salesByBuyer
|
|
478 |
* = txns.parallelStream()
|
|
479 |
* .unordered()
|
|
480 |
* .collect(groupingByConcurrent(Transaction::getBuyer));
|
|
481 |
* }</pre>
|
|
482 |
* (where {@link java.util.stream.Collectors#groupingByConcurrent} is the concurrent companion
|
|
483 |
* to {@code groupingBy}).
|
|
484 |
*
|
|
485 |
* <p>Note that if it is important that the elements for a given key appear in the
|
|
486 |
* order they appear in the source, then we cannot use a concurrent reduction,
|
|
487 |
* as ordering is one of the casualties of concurrent insertion. We would then
|
|
488 |
* be constrained to implement either a sequential reduction or a merge-based
|
|
489 |
* parallel reduction.
|
|
490 |
*
|
|
491 |
* <h2><a name="Associativity">Associativity</a></h2>
|
|
492 |
*
|
|
493 |
* An operator or function {@code op} is <em>associative</em> if the following holds:
|
|
494 |
* <pre>{@code
|
|
495 |
* (a op b) op c == a op (b op c)
|
|
496 |
* }</pre>
|
|
497 |
* The importance of this to parallel evaluation can be seen if we expand this to four terms:
|
|
498 |
* <pre>{@code
|
|
499 |
* a op b op c op d == (a op b) op (c op d)
|
|
500 |
* }</pre>
|
|
501 |
* So we can evaluate {@code (a op b)} in parallel with {@code (c op d)} and then invoke {@code op} on
|
|
502 |
* the results.
|
|
503 |
* TODO what does associative mean for mutative combining functions?
|
|
504 |
* FIXME: we described mutative associativity above.
|
|
505 |
*
|
|
506 |
* <h2><a name="StreamSources">Stream sources</a></h2>
|
|
507 |
* TODO where does this section go?
|
|
508 |
*
|
|
509 |
* XXX - change to section to stream construction gradually introducing more
|
|
510 |
* complex ways to construct
|
|
511 |
* - construction from Collection
|
|
512 |
* - construction from Iterator
|
|
513 |
* - construction from array
|
|
514 |
* - construction from generators
|
|
515 |
* - construction from spliterator
|
|
516 |
*
|
|
517 |
* XXX - the following is quite low-level but important aspect of stream constriction
|
|
518 |
*
|
|
519 |
* <p>A pipeline is initially constructed from a spliterator (see {@link java.util.Spliterator}) supplied by a stream source.
|
|
520 |
* The spliterator covers elements of the source and provides element traversal operations
|
|
521 |
* for a possibly-parallel computation. See methods on {@link java.util.stream.Streams} for construction
|
|
522 |
* of pipelines using spliterators.
|
|
523 |
*
|
|
524 |
* <p>A source may directly supply a spliterator. If so, the spliterator is traversed, split, or queried
|
|
525 |
* for estimated size after, and never before, the terminal operation commences. It is strongly recommended
|
|
526 |
* that the spliterator report a characteristic of {@code IMMUTABLE} or {@code CONCURRENT}, or be
|
|
527 |
* <em>late-binding</em> and not bind to the elements it covers until traversed, split or queried for
|
|
528 |
* estimated size.
|
|
529 |
*
|
|
530 |
* <p>If a source cannot directly supply a recommended spliterator then it may indirectly supply a spliterator
|
|
531 |
* using a {@code Supplier}. The spliterator is obtained from the supplier after, and never before, the terminal
|
|
532 |
* operation of the stream pipeline commences.
|
|
533 |
*
|
|
534 |
* <p>Such requirements significantly reduce the scope of potential interference to the interval starting
|
|
535 |
* with the commencing of the terminal operation and ending with the producing a result or side-effect. See
|
|
536 |
* <a href="package-summary.html#Non-Interference">Non-Interference</a> for
|
|
537 |
* more details.
|
|
538 |
*
|
|
539 |
* XXX - move the following to the non-interference section
|
|
540 |
*
|
|
541 |
* <p>A source can be modified before the terminal operation commences and those modifications will be reflected in
|
|
542 |
* the covered elements. Afterwards, and depending on the properties of the source, further modifications
|
|
543 |
* might not be reflected and the throwing of a {@code ConcurrentModificationException} may occur.
|
|
544 |
*
|
|
545 |
* <p>For example, consider the following code:
|
|
546 |
* <pre>{@code
|
|
547 |
* List<String> l = new ArrayList(Arrays.asList("one", "two"));
|
|
548 |
* Stream<String> sl = l.stream();
|
|
549 |
* l.add("three");
|
|
550 |
* String s = sl.collect(toStringJoiner(" ")).toString();
|
|
551 |
* }</pre>
|
|
552 |
* First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list.
|
|
553 |
* Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected
|
|
554 |
* and joined together. Since the list was modified before the terminal {@code collect} operation commenced
|
|
555 |
* the result will be a string of "one two three". However, if the list is modified after the terminal operation
|
|
556 |
* commences, as in:
|
|
557 |
* <pre>{@code
|
|
558 |
* List<String> l = new ArrayList(Arrays.asList("one", "two"));
|
|
559 |
* Stream<String> sl = l.stream();
|
|
560 |
* String s = sl.peek(s -> l.add("BAD LAMBDA")).collect(toStringJoiner(" ")).toString();
|
|
561 |
* }</pre>
|
|
562 |
* then a {@code ConcurrentModificationException} will be thrown since the {@code peek} operation will attempt
|
|
563 |
* to add the string "BAD LAMBDA" to the list after the terminal operation has commenced.
|
|
564 |
*/
|
|
565 |
|
|
566 |
package java.util.stream;
|