|
1 /* |
|
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 /** |
|
27 * <h1>java.util.stream</h1> |
|
28 * |
|
29 * Classes to support functional-style operations on streams of values, as in the following: |
|
30 * |
|
31 * <pre>{@code |
|
32 * int sumOfWeights = blocks.stream().filter(b -> b.getColor() == RED) |
|
33 * .mapToInt(b -> b.getWeight()) |
|
34 * .sum(); |
|
35 * }</pre> |
|
36 * |
|
37 * <p>Here we use {@code blocks}, which might be a {@code Collection}, as a source for a stream, |
|
38 * and then perform a filter-map-reduce ({@code sum()} is an example of a <a href="package-summary.html#Reduction">reduction</a> |
|
39 * operation) on the stream to obtain the sum of the weights of the red blocks. |
|
40 * |
|
41 * <p>The key abstraction used in this approach is {@link java.util.stream.Stream}, as well as its primitive |
|
42 * specializations {@link java.util.stream.IntStream}, {@link java.util.stream.LongStream}, |
|
43 * and {@link java.util.stream.DoubleStream}. Streams differ from Collections in several ways: |
|
44 * |
|
45 * <ul> |
|
46 * <li>No storage. A stream is not a data structure that stores elements; instead, they |
|
47 * carry values from a source (which could be a data structure, a generator, an IO channel, etc) |
|
48 * through a pipeline of computational operations.</li> |
|
49 * <li>Functional in nature. An operation on a stream produces a result, but does not modify |
|
50 * its underlying data source. For example, filtering a {@code Stream} produces a new {@code Stream}, |
|
51 * rather than removing elements from the underlying source.</li> |
|
52 * <li>Laziness-seeking. Many stream operations, such as filtering, mapping, or duplicate removal, |
|
53 * can be implemented lazily, exposing opportunities for optimization. (For example, "find the first |
|
54 * {@code String} matching a pattern" need not examine all the input strings.) Stream operations |
|
55 * are divided into intermediate ({@code Stream}-producing) operations and terminal (value-producing) |
|
56 * operations; all intermediate operations are lazy.</li> |
|
57 * <li>Possibly unbounded. While collections have a finite size, streams need not. Operations |
|
58 * such as {@code limit(n)} or {@code findFirst()} can allow computations on infinite streams |
|
59 * to complete in finite time.</li> |
|
60 * </ul> |
|
61 * |
|
62 * <h2><a name="StreamPipelines">Stream pipelines</a></h2> |
|
63 * |
|
64 * <p>Streams are used to create <em>pipelines</em> of <a href="package-summary.html#StreamOps">operations</a>. A |
|
65 * complete stream pipeline has several components: a source (which may be a {@code Collection}, |
|
66 * an array, a generator function, or an IO channel); zero or more <em>intermediate operations</em> |
|
67 * such as {@code Stream.filter} or {@code Stream.map}; and a <em>terminal operation</em> such |
|
68 * as {@code Stream.forEach} or {@code java.util.stream.Stream.reduce}. Stream operations may take as parameters |
|
69 * <em>function values</em> (which are often lambda expressions, but could be method references |
|
70 * or objects) which parameterize the behavior of the operation, such as a {@code Predicate} |
|
71 * passed to the {@code Stream#filter} method. |
|
72 * |
|
73 * <p>Intermediate operations return a new {@code Stream}. They are lazy; executing an |
|
74 * intermediate operation such as {@link java.util.stream.Stream#filter Stream.filter} does |
|
75 * not actually perform any filtering, instead creating a new {@code Stream} that, when |
|
76 * traversed, contains the elements of the initial {@code Stream} that match the |
|
77 * given {@code Predicate}. Consuming elements from the stream source does not |
|
78 * begin until the terminal operation is executed. |
|
79 * |
|
80 * <p>Terminal operations consume the {@code Stream} and produce a result or a side-effect. |
|
81 * After a terminal operation is performed, the stream can no longer be used and you must |
|
82 * return to the data source, or select a new data source, to get a new stream. For example, |
|
83 * obtaining the sum of weights of all red blocks, and then of all blue blocks, requires a |
|
84 * filter-map-reduce on two different streams: |
|
85 * <pre>{@code |
|
86 * int sumOfRedWeights = blocks.stream().filter(b -> b.getColor() == RED) |
|
87 * .mapToInt(b -> b.getWeight()) |
|
88 * .sum(); |
|
89 * int sumOfBlueWeights = blocks.stream().filter(b -> b.getColor() == BLUE) |
|
90 * .mapToInt(b -> b.getWeight()) |
|
91 * .sum(); |
|
92 * }</pre> |
|
93 * |
|
94 * <p>However, there are other techniques that allow you to obtain both results in a single |
|
95 * pass if multiple traversal is impractical or inefficient. TODO provide link |
|
96 * |
|
97 * <h3><a name="StreamOps">Stream operations</a></h3> |
|
98 * |
|
99 * <p>Intermediate stream operation (such as {@code filter} or {@code sorted}) always produce a |
|
100 * new {@code Stream}, and are always<em>lazy</em>. Executing a lazy operations does not |
|
101 * trigger processing of the stream contents; all processing is deferred until the terminal |
|
102 * operation commences. Processing streams lazily allows for significant efficiencies; in a |
|
103 * pipeline such as the filter-map-sum example above, filtering, mapping, and addition can be |
|
104 * fused into a single pass, with minimal intermediate state. Laziness also enables us to avoid |
|
105 * examining all the data when it is not necessary; for operations such as "find the first |
|
106 * string longer than 1000 characters", one need not examine all the input strings, just enough |
|
107 * to find one that has the desired characteristics. (This behavior becomes even more important |
|
108 * when the input stream is infinite and not merely large.) |
|
109 * |
|
110 * <p>Intermediate operations are further divided into <em>stateless</em> and <em>stateful</em> |
|
111 * operations. Stateless operations retain no state from previously seen values when processing |
|
112 * a new value; examples of stateless intermediate operations include {@code filter} and |
|
113 * {@code map}. Stateful operations may incorporate state from previously seen elements in |
|
114 * processing new values; examples of stateful intermediate operations include {@code distinct} |
|
115 * and {@code sorted}. Stateful operations may need to process the entire input before |
|
116 * producing a result; for example, one cannot produce any results from sorting a stream until |
|
117 * one has seen all elements of the stream. As a result, under parallel computation, some |
|
118 * pipelines containing stateful intermediate operations have to be executed in multiple passes. |
|
119 * Pipelines containing exclusively stateless intermediate operations can be processed in a |
|
120 * single pass, whether sequential or parallel. |
|
121 * |
|
122 * <p>Further, some operations are deemed <em>short-circuiting</em> operations. An intermediate |
|
123 * operation is short-circuiting if, when presented with infinite input, it may produce a |
|
124 * finite stream as a result. A terminal operation is short-circuiting if, when presented with |
|
125 * infinite input, it may terminate in finite time. (Having a short-circuiting operation is a |
|
126 * necessary, but not sufficient, condition for the processing of an infinite stream to |
|
127 * terminate normally in finite time.) |
|
128 * |
|
129 * Terminal operations (such as {@code forEach} or {@code findFirst}) are always eager |
|
130 * (they execute completely before returning), and produce a non-{@code Stream} result, such |
|
131 * as a primitive value or a {@code Collection}, or have side-effects. |
|
132 * |
|
133 * <h3>Parallelism</h3> |
|
134 * |
|
135 * <p>By recasting aggregate operations as a pipeline of operations on a stream of values, many |
|
136 * aggregate operations can be more easily parallelized. A {@code Stream} can execute either |
|
137 * in serial or in parallel. When streams are created, they are either created as sequential |
|
138 * or parallel streams; the parallel-ness of streams can also be switched by the |
|
139 * {@link java.util.stream Stream#sequential()} and {@link java.util.stream.Stream#parallel()} |
|
140 * operations. The {@code Stream} implementations in the JDK create serial streams unless |
|
141 * parallelism is explicitly requested. For example, {@code Collection} has methods |
|
142 * {@link java.util.Collection#stream} and {@link java.util.Collection#parallelStream}, |
|
143 * which produce sequential and parallel streams respectively; other stream-bearing methods |
|
144 * such as {@link java.util.stream.Streams#intRange(int, int)} produce sequential |
|
145 * streams but these can be efficiently parallelized by calling {@code parallel()} on the |
|
146 * result. The set of operations on serial and parallel streams is identical. To execute the |
|
147 * "sum of weights of blocks" query in parallel, we would do: |
|
148 * |
|
149 * <pre>{@code |
|
150 * int sumOfWeights = blocks.parallelStream().filter(b -> b.getColor() == RED) |
|
151 * .mapToInt(b -> b.getWeight()) |
|
152 * .sum(); |
|
153 * }</pre> |
|
154 * |
|
155 * <p>The only difference between the serial and parallel versions of this example code is |
|
156 * the creation of the initial {@code Stream}. Whether a {@code Stream} will execute in serial |
|
157 * or parallel can be determined by the {@code Stream#isParallel} method. When the terminal |
|
158 * operation is initiated, the entire stream pipeline is either executed sequentially or in |
|
159 * parallel, determined by the last operation that affected the stream's serial-parallel |
|
160 * orientation (which could be the stream source, or the {@code sequential()} or |
|
161 * {@code parallel()} methods.) |
|
162 * |
|
163 * <p>In order for the results of parallel operations to be deterministic and consistent with |
|
164 * their serial equivalent, the function values passed into the various stream operations should |
|
165 * be <a href="#NonInteference"><em>stateless</em></a>. |
|
166 * |
|
167 * <h3><a name="Ordering">Ordering</a></h3> |
|
168 * |
|
169 * <p>Streams may or may not have an <em>encounter order</em>. An encounter |
|
170 * order specifies the order in which elements are provided by the stream to the |
|
171 * operations pipeline. Whether or not there is an encounter order depends on |
|
172 * the source, the intermediate operations, and the terminal operation. |
|
173 * Certain stream sources (such as {@code List} or arrays) are intrinsically |
|
174 * ordered, whereas others (such as {@code HashSet}) are not. Some intermediate |
|
175 * operations may impose an encounter order on an otherwise unordered stream, |
|
176 * such as {@link java.util.stream.Stream#sorted()}, and others may render an |
|
177 * ordered stream unordered (such as {@link java.util.stream.Stream#unordered()}). |
|
178 * Some terminal operations may ignore encounter order, such as |
|
179 * {@link java.util.stream.Stream#forEach}. |
|
180 * |
|
181 * <p>If a Stream is ordered, most operations are constrained to operate on the |
|
182 * elements in their encounter order; if the source of a stream is a {@code List} |
|
183 * containing {@code [1, 2, 3]}, then the result of executing {@code map(x -> x*2)} |
|
184 * must be {@code [2, 4, 6]}. However, if the source has no defined encounter |
|
185 * order, than any of the six permutations of the values {@code [2, 4, 6]} would |
|
186 * be a valid result. Many operations can still be efficiently parallelized even |
|
187 * under ordering constraints. |
|
188 * |
|
189 * <p>For sequential streams, ordering is only relevant to the determinism |
|
190 * of operations performed repeatedly on the same source. (An {@code ArrayList} |
|
191 * is constrained to iterate elements in order; a {@code HashSet} is not, and |
|
192 * repeated iteration might produce a different order.) |
|
193 * |
|
194 * <p>For parallel streams, relaxing the ordering constraint can enable |
|
195 * optimized implementation for some operations. For example, duplicate |
|
196 * filtration on an ordered stream must completely process the first partition |
|
197 * before it can return any elements from a subsequent partition, even if those |
|
198 * elements are available earlier. On the other hand, without the constraint of |
|
199 * ordering, duplicate filtration can be done more efficiently by using |
|
200 * a shared {@code ConcurrentHashSet}. There will be cases where the stream |
|
201 * is structurally ordered (the source is ordered and the intermediate |
|
202 * operations are order-preserving), but the user does not particularly care |
|
203 * about the encounter order. In some cases, explicitly de-ordering the stream |
|
204 * with the {@link java.util.stream.Stream#unordered()} method may result in |
|
205 * improved parallel performance for some stateful or terminal operations. |
|
206 * |
|
207 * <h3><a name="Non-Interference">Non-interference</a></h3> |
|
208 * |
|
209 * The {@code java.util.stream} package enables you to execute possibly-parallel |
|
210 * bulk-data operations over a variety of data sources, including even non-thread-safe |
|
211 * collections such as {@code ArrayList}. This is possible only if we can |
|
212 * prevent <em>interference</em> with the data source during the execution of a |
|
213 * stream pipeline. (Execution begins when the terminal operation is invoked, and ends |
|
214 * when the terminal operation completes.) For most data sources, preventing interference |
|
215 * means ensuring that the data source is <em>not modified at all</em> during the execution |
|
216 * of the stream pipeline. (Some data sources, such as concurrent collections, are |
|
217 * specifically designed to handle concurrent modification.) |
|
218 * |
|
219 * <p>Accordingly, lambda expressions (or other objects implementing the appropriate functional |
|
220 * interface) passed to stream methods should never modify the stream's data source. An |
|
221 * implementation is said to <em>interfere</em> with the data source if it modifies, or causes |
|
222 * to be modified, the stream's data source. The need for non-interference applies to all |
|
223 * pipelines, not just parallel ones. Unless the stream source is concurrent, modifying a |
|
224 * stream's data source during execution of a stream pipeline can cause exceptions, incorrect |
|
225 * answers, or nonconformant results. |
|
226 * |
|
227 * <p>Further, results may be nondeterministic or incorrect if the lambda expressions passed to |
|
228 * stream operations are <em>stateful</em>. A stateful lambda (or other object implementing the |
|
229 * appropriate functional interface) is one whose result depends on any state which might change |
|
230 * during the execution of the stream pipeline. An example of a stateful lambda is: |
|
231 * <pre>{@code |
|
232 * Set<Integer> seen = Collections.synchronizedSet(new HashSet<>()); |
|
233 * stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })... |
|
234 * }</pre> |
|
235 * Here, if the mapping operation is performed in parallel, the results for the same input |
|
236 * could vary from run to run, due to thread scheduling differences, whereas, with a stateless |
|
237 * lambda expression the results would always be the same. |
|
238 * |
|
239 * <h3>Side-effects</h3> |
|
240 * |
|
241 * <h2><a name="Reduction">Reduction operations</a></h2> |
|
242 * |
|
243 * A <em>reduction</em> operation takes a stream of elements and processes them in a way |
|
244 * that reduces to a single value or summary description, such as finding the sum or maximum |
|
245 * of a set of numbers. (In more complex scenarios, the reduction operation might need to |
|
246 * extract data from the elements before reducing that data to a single value, such as |
|
247 * finding the sum of weights of a set of blocks. This would require extracting the weight |
|
248 * from each block before summing up the weights.) |
|
249 * |
|
250 * <p>Of course, such operations can be readily implemented as simple sequential loops, as in: |
|
251 * <pre>{@code |
|
252 * int sum = 0; |
|
253 * for (int x : numbers) { |
|
254 * sum += x; |
|
255 * } |
|
256 * }</pre> |
|
257 * However, there may be a significant advantage to preferring a {@link java.util.stream.Stream#reduce reduce operation} |
|
258 * over a mutative accumulation such as the above -- a properly constructed reduce operation is |
|
259 * inherently parallelizable so long as the |
|
260 * {@link java.util.function.BinaryOperator reduction operaterator} |
|
261 * has the right characteristics. Specifically the operator must be |
|
262 * <a href="#Associativity">associative</a>. For example, given a |
|
263 * stream of numbers for which we want to find the sum, we can write: |
|
264 * <pre>{@code |
|
265 * int sum = numbers.reduce(0, (x,y) -> x+y); |
|
266 * }</pre> |
|
267 * or more succinctly: |
|
268 * <pre>{@code |
|
269 * int sum = numbers.reduce(0, Integer::sum); |
|
270 * }</pre> |
|
271 * |
|
272 * <p>(The primitive specializations of {@link java.util.stream.Stream}, such as |
|
273 * {@link java.util.stream.IntStream}, even have convenience methods for common reductions, |
|
274 * such as {@link java.util.stream.IntStream#sum() sum} and {@link java.util.stream.IntStream#max() max}, |
|
275 * which are implemented as simple wrappers around reduce.) |
|
276 * |
|
277 * <p>Reduction parallellizes well since the implementation of {@code reduce} can operate on |
|
278 * subsets of the stream in parallel, and then combine the intermediate results to get the final |
|
279 * correct answer. Even if you were to use a parallelizable form of the |
|
280 * {@link java.util.stream.Stream#forEach(Consumer) forEach()} method |
|
281 * in place of the original for-each loop above, you would still have to provide thread-safe |
|
282 * updates to the shared accumulating variable {@code sum}, and the required synchronization |
|
283 * would likely eliminate any performance gain from parallelism. Using a {@code reduce} method |
|
284 * instead removes all of the burden of parallelizing the reduction operation, and the library |
|
285 * can provide an efficient parallel implementation with no additional synchronization needed. |
|
286 * |
|
287 * <p>The "blocks" examples shown earlier shows how reduction combines with other operations |
|
288 * to replace for loops with bulk operations. If {@code blocks} is a collection of {@code Block} |
|
289 * objects, which have a {@code getWeight} method, we can find the heaviest block with: |
|
290 * <pre>{@code |
|
291 * OptionalInt heaviest = blocks.stream() |
|
292 * .mapToInt(Block::getWeight) |
|
293 * .reduce(Integer::max); |
|
294 * }</pre> |
|
295 * |
|
296 * <p>In its more general form, a {@code reduce} operation on elements of type {@code <T>} |
|
297 * yielding a result of type {@code <U>} requires three parameters: |
|
298 * <pre>{@code |
|
299 * <U> U reduce(U identity, |
|
300 * BiFunction<U, ? super T, U> accumlator, |
|
301 * BinaryOperator<U> combiner); |
|
302 * }</pre> |
|
303 * Here, the <em>identity</em> element is both an initial seed for the reduction, and a default |
|
304 * result if there are no elements. The <em>accumulator</em> function takes a partial result and |
|
305 * the next element, and produce a new partial result. The <em>combiner</em> function combines |
|
306 * the partial results of two accumulators to produce a new partial result, and eventually the |
|
307 * final result. |
|
308 * |
|
309 * <p>This form is a generalization of the two-argument form, and is also a generalization of |
|
310 * the map-reduce construct illustrated above. If we wanted to re-cast the simple {@code sum} |
|
311 * example using the more general form, {@code 0} would be the identity element, while |
|
312 * {@code Integer::sum} would be both the accumulator and combiner. For the sum-of-weights |
|
313 * example, this could be re-cast as: |
|
314 * <pre>{@code |
|
315 * int sumOfWeights = blocks.stream().reduce(0, |
|
316 * (sum, b) -> sum + b.getWeight()) |
|
317 * Integer::sum); |
|
318 * }</pre> |
|
319 * though the map-reduce form is more readable and generally preferable. The generalized form |
|
320 * is provided for cases where significant work can be optimized away by combining mapping and |
|
321 * reducing into a single function. |
|
322 * |
|
323 * <p>More formally, the {@code identity} value must be an <em>identity</em> for the combiner |
|
324 * function. This means that for all {@code u}, {@code combiner.apply(identity, u)} is equal |
|
325 * to {@code u}. Additionally, the {@code combiner} function must be |
|
326 * <a href="#Associativity">associative</a> and must be compatible with the {@code accumulator} |
|
327 * function; for all {@code u} and {@code t}, the following must hold: |
|
328 * <pre>{@code |
|
329 * combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t) |
|
330 * }</pre> |
|
331 * |
|
332 * <h3><a name="MutableReduction">Mutable Reduction</a></h3> |
|
333 * |
|
334 * A <em>mutable</em> reduction operation is similar to an ordinary reduction, in that it reduces |
|
335 * a stream of values to a single value, but instead of producing a distinct single-valued result, it |
|
336 * mutates a general <em>result container</em>, such as a {@code Collection} or {@code StringBuilder}, |
|
337 * as it processes the elements in the stream. |
|
338 * |
|
339 * <p>For example, if we wanted to take a stream of strings and concatenate them into a single |
|
340 * long string, we <em>could</em> achieve this with ordinary reduction: |
|
341 * <pre>{@code |
|
342 * String concatenated = strings.reduce("", String::concat) |
|
343 * }</pre> |
|
344 * |
|
345 * We would get the desired result, and it would even work in parallel. However, we might not |
|
346 * be happy about the performance! Such an implementation would do a great deal of string |
|
347 * copying, and the run time would be <em>O(n^2)</em> in the number of elements. A more |
|
348 * performant approach would be to accumulate the results into a {@link java.lang.StringBuilder}, which |
|
349 * is a mutable container for accumulating strings. We can use the same technique to |
|
350 * parallelize mutable reduction as we do with ordinary reduction. |
|
351 * |
|
352 * <p>The mutable reduction operation is called {@link java.util.stream.Stream#collect(Collector) collect()}, as it |
|
353 * collects together the desired results into a result container such as {@code StringBuilder}. |
|
354 * A {@code collect} operation requires three things: a factory function which will construct |
|
355 * new instances of the result container, an accumulating function that will update a result |
|
356 * container by incorporating a new element, and a combining function that can take two |
|
357 * result containers and merge their contents. The form of this is very similar to the general |
|
358 * form of ordinary reduction: |
|
359 * <pre>{@code |
|
360 * <R> R collect(Supplier<R> resultFactory, |
|
361 * BiConsumer<R, ? super T> accumulator, |
|
362 * BiConsumer<R, R> combiner); |
|
363 * }</pre> |
|
364 * As with {@code reduce()}, the benefit of expressing {@code collect} in this abstract way is |
|
365 * that it is directly amenable to parallelization: we can accumulate partial results in parallel |
|
366 * and then combine them. For example, to collect the String representations of the elements |
|
367 * in a stream into an {@code ArrayList}, we could write the obvious sequential for-each form: |
|
368 * <pre>{@code |
|
369 * ArrayList<String> strings = new ArrayList<>(); |
|
370 * for (T element : stream) { |
|
371 * strings.add(element.toString()); |
|
372 * } |
|
373 * }</pre> |
|
374 * Or we could use a parallelizable collect form: |
|
375 * <pre>{@code |
|
376 * ArrayList<String> strings = stream.collect(() -> new ArrayList<>(), |
|
377 * (c, e) -> c.add(e.toString()), |
|
378 * (c1, c2) -> c1.addAll(c2)); |
|
379 * }</pre> |
|
380 * or, noting that we have buried a mapping operation inside the accumulator function, more |
|
381 * succinctly as: |
|
382 * <pre>{@code |
|
383 * ArrayList<String> strings = stream.map(Object::toString) |
|
384 * .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); |
|
385 * }</pre> |
|
386 * Here, our supplier is just the {@link java.util.ArrayList#ArrayList() ArrayList constructor}, the |
|
387 * accumulator adds the stringified element to an {@code ArrayList}, and the combiner simply |
|
388 * uses {@link java.util.ArrayList#addAll addAll} to copy the strings from one container into the other. |
|
389 * |
|
390 * <p>As with the regular reduction operation, the ability to parallelize only comes if an |
|
391 * <a href="package-summary.html#Associativity">associativity</a> condition is met. The {@code combiner} is associative |
|
392 * if for result containers {@code r1}, {@code r2}, and {@code r3}: |
|
393 * <pre>{@code |
|
394 * combiner.accept(r1, r2); |
|
395 * combiner.accept(r1, r3); |
|
396 * }</pre> |
|
397 * is equivalent to |
|
398 * <pre>{@code |
|
399 * combiner.accept(r2, r3); |
|
400 * combiner.accept(r1, r2); |
|
401 * }</pre> |
|
402 * where equivalence means that {@code r1} is left in the same state (according to the meaning |
|
403 * of {@link java.lang.Object#equals equals} for the element types). Similarly, the {@code resultFactory} |
|
404 * must act as an <em>identity</em> with respect to the {@code combiner} so that for any result |
|
405 * container {@code r}: |
|
406 * <pre>{@code |
|
407 * combiner.accept(r, resultFactory.get()); |
|
408 * }</pre> |
|
409 * does not modify the state of {@code r} (again according to the meaning of |
|
410 * {@link java.lang.Object#equals equals}). Finally, the {@code accumulator} and {@code combiner} must be |
|
411 * compatible such that for a result container {@code r} and element {@code t}: |
|
412 * <pre>{@code |
|
413 * r2 = resultFactory.get(); |
|
414 * accumulator.accept(r2, t); |
|
415 * combiner.accept(r, r2); |
|
416 * }</pre> |
|
417 * is equivalent to: |
|
418 * <pre>{@code |
|
419 * accumulator.accept(r,t); |
|
420 * }</pre> |
|
421 * where equivalence means that {@code r} is left in the same state (again according to the |
|
422 * meaning of {@link java.lang.Object#equals equals}). |
|
423 * |
|
424 * <p> The three aspects of {@code collect}: supplier, accumulator, and combiner, are often very |
|
425 * tightly coupled, and it is convenient to introduce the notion of a {@link java.util.stream.Collector} as |
|
426 * being an object that embodies all three aspects. There is a {@link java.util.stream.Stream#collect(Collector) collect} |
|
427 * method that simply takes a {@code Collector} and returns the resulting container. |
|
428 * The above example for collecting strings into a {@code List} can be rewritten using a |
|
429 * standard {@code Collector} as: |
|
430 * <pre>{@code |
|
431 * ArrayList<String> strings = stream.map(Object::toString) |
|
432 * .collect(Collectors.toList()); |
|
433 * }</pre> |
|
434 * |
|
435 * <h3><a name="ConcurrentReduction">Reduction, Concurrency, and Ordering</a></h3> |
|
436 * |
|
437 * With some complex reduction operations, for example a collect that produces a |
|
438 * {@code Map}, such as: |
|
439 * <pre>{@code |
|
440 * Map<Buyer, List<Transaction>> salesByBuyer |
|
441 * = txns.parallelStream() |
|
442 * .collect(Collectors.groupingBy(Transaction::getBuyer)); |
|
443 * }</pre> |
|
444 * (where {@link java.util.stream.Collectors#groupingBy} is a utility function |
|
445 * that returns a {@link java.util.stream.Collector} for grouping sets of elements based on some key) |
|
446 * it may actually be counterproductive to perform the operation in parallel. |
|
447 * This is because the combining step (merging one {@code Map} into another by key) |
|
448 * can be expensive for some {@code Map} implementations. |
|
449 * |
|
450 * <p>Suppose, however, that the result container used in this reduction |
|
451 * was a concurrently modifiable collection -- such as a |
|
452 * {@link java.util.concurrent.ConcurrentHashMap ConcurrentHashMap}. In that case, |
|
453 * the parallel invocations of the accumulator could actually deposit their results |
|
454 * concurrently into the same shared result container, eliminating the need for the combiner to |
|
455 * merge distinct result containers. This potentially provides a boost |
|
456 * to the parallel execution performance. We call this a <em>concurrent</em> reduction. |
|
457 * |
|
458 * <p>A {@link java.util.stream.Collector} that supports concurrent reduction is marked with the |
|
459 * {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic. |
|
460 * Having a concurrent collector is a necessary condition for performing a |
|
461 * concurrent reduction, but that alone is not sufficient. If you imagine multiple |
|
462 * accumulators depositing results into a shared container, the order in which |
|
463 * results are deposited is non-deterministic. Consequently, a concurrent reduction |
|
464 * is only possible if ordering is not important for the stream being processed. |
|
465 * The {@link java.util.stream.Stream#collect(Collector)} |
|
466 * implementation will only perform a concurrent reduction if |
|
467 * <ul> |
|
468 * <li>The stream is parallel;</li> |
|
469 * <li>The collector has the |
|
470 * {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic, |
|
471 * and;</li> |
|
472 * <li>Either the stream is unordered, or the collector has the |
|
473 * {@link java.util.stream.Collector.Characteristics#UNORDERED} characteristic. |
|
474 * </ul> |
|
475 * For example: |
|
476 * <pre>{@code |
|
477 * Map<Buyer, List<Transaction>> salesByBuyer |
|
478 * = txns.parallelStream() |
|
479 * .unordered() |
|
480 * .collect(groupingByConcurrent(Transaction::getBuyer)); |
|
481 * }</pre> |
|
482 * (where {@link java.util.stream.Collectors#groupingByConcurrent} is the concurrent companion |
|
483 * to {@code groupingBy}). |
|
484 * |
|
485 * <p>Note that if it is important that the elements for a given key appear in the |
|
486 * order they appear in the source, then we cannot use a concurrent reduction, |
|
487 * as ordering is one of the casualties of concurrent insertion. We would then |
|
488 * be constrained to implement either a sequential reduction or a merge-based |
|
489 * parallel reduction. |
|
490 * |
|
491 * <h2><a name="Associativity">Associativity</a></h2> |
|
492 * |
|
493 * An operator or function {@code op} is <em>associative</em> if the following holds: |
|
494 * <pre>{@code |
|
495 * (a op b) op c == a op (b op c) |
|
496 * }</pre> |
|
497 * The importance of this to parallel evaluation can be seen if we expand this to four terms: |
|
498 * <pre>{@code |
|
499 * a op b op c op d == (a op b) op (c op d) |
|
500 * }</pre> |
|
501 * So we can evaluate {@code (a op b)} in parallel with {@code (c op d)} and then invoke {@code op} on |
|
502 * the results. |
|
503 * TODO what does associative mean for mutative combining functions? |
|
504 * FIXME: we described mutative associativity above. |
|
505 * |
|
506 * <h2><a name="StreamSources">Stream sources</a></h2> |
|
507 * TODO where does this section go? |
|
508 * |
|
509 * XXX - change to section to stream construction gradually introducing more |
|
510 * complex ways to construct |
|
511 * - construction from Collection |
|
512 * - construction from Iterator |
|
513 * - construction from array |
|
514 * - construction from generators |
|
515 * - construction from spliterator |
|
516 * |
|
517 * XXX - the following is quite low-level but important aspect of stream constriction |
|
518 * |
|
519 * <p>A pipeline is initially constructed from a spliterator (see {@link java.util.Spliterator}) supplied by a stream source. |
|
520 * The spliterator covers elements of the source and provides element traversal operations |
|
521 * for a possibly-parallel computation. See methods on {@link java.util.stream.Streams} for construction |
|
522 * of pipelines using spliterators. |
|
523 * |
|
524 * <p>A source may directly supply a spliterator. If so, the spliterator is traversed, split, or queried |
|
525 * for estimated size after, and never before, the terminal operation commences. It is strongly recommended |
|
526 * that the spliterator report a characteristic of {@code IMMUTABLE} or {@code CONCURRENT}, or be |
|
527 * <em>late-binding</em> and not bind to the elements it covers until traversed, split or queried for |
|
528 * estimated size. |
|
529 * |
|
530 * <p>If a source cannot directly supply a recommended spliterator then it may indirectly supply a spliterator |
|
531 * using a {@code Supplier}. The spliterator is obtained from the supplier after, and never before, the terminal |
|
532 * operation of the stream pipeline commences. |
|
533 * |
|
534 * <p>Such requirements significantly reduce the scope of potential interference to the interval starting |
|
535 * with the commencing of the terminal operation and ending with the producing a result or side-effect. See |
|
536 * <a href="package-summary.html#Non-Interference">Non-Interference</a> for |
|
537 * more details. |
|
538 * |
|
539 * XXX - move the following to the non-interference section |
|
540 * |
|
541 * <p>A source can be modified before the terminal operation commences and those modifications will be reflected in |
|
542 * the covered elements. Afterwards, and depending on the properties of the source, further modifications |
|
543 * might not be reflected and the throwing of a {@code ConcurrentModificationException} may occur. |
|
544 * |
|
545 * <p>For example, consider the following code: |
|
546 * <pre>{@code |
|
547 * List<String> l = new ArrayList(Arrays.asList("one", "two")); |
|
548 * Stream<String> sl = l.stream(); |
|
549 * l.add("three"); |
|
550 * String s = sl.collect(toStringJoiner(" ")).toString(); |
|
551 * }</pre> |
|
552 * First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list. |
|
553 * Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected |
|
554 * and joined together. Since the list was modified before the terminal {@code collect} operation commenced |
|
555 * the result will be a string of "one two three". However, if the list is modified after the terminal operation |
|
556 * commences, as in: |
|
557 * <pre>{@code |
|
558 * List<String> l = new ArrayList(Arrays.asList("one", "two")); |
|
559 * Stream<String> sl = l.stream(); |
|
560 * String s = sl.peek(s -> l.add("BAD LAMBDA")).collect(toStringJoiner(" ")).toString(); |
|
561 * }</pre> |
|
562 * then a {@code ConcurrentModificationException} will be thrown since the {@code peek} operation will attempt |
|
563 * to add the string "BAD LAMBDA" to the list after the terminal operation has commenced. |
|
564 */ |
|
565 |
|
566 package java.util.stream; |