|
1 /* |
|
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 package java.util.stream; |
|
26 |
|
27 import java.util.Spliterator; |
|
28 import java.util.concurrent.CountedCompleter; |
|
29 import java.util.concurrent.ForkJoinPool; |
|
30 |
|
31 /** |
|
32 * Abstract base class for most fork-join tasks used to implement stream ops. |
|
33 * Manages splitting logic, tracking of child tasks, and intermediate results. |
|
34 * Each task is associated with a {@link Spliterator} that describes the portion |
|
35 * of the input associated with the subtree rooted at this task. |
|
36 * Tasks may be leaf nodes (which will traverse the elements of |
|
37 * the {@code Spliterator}) or internal nodes (which split the |
|
38 * {@code Spliterator} into multiple child tasks). |
|
39 * |
|
40 * @implNote |
|
41 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task |
|
42 * where each task has a semaphore-like count of uncompleted children, and the |
|
43 * task is implicitly completed and notified when its last child completes. |
|
44 * Internal node tasks will likely override the {@code onCompletion} method from |
|
45 * {@code CountedCompleter} to merge the results from child tasks into the |
|
46 * current task's result. |
|
47 * |
|
48 * <p>Splitting and setting up the child task links is done by {@code compute()} |
|
49 * for internal nodes. At {@code compute()} time for leaf nodes, it is |
|
50 * guaranteed that the parent's child-related fields (including sibling links |
|
51 * for the parent's children) will be set up for all children. |
|
52 * |
|
53 * <p>For example, a task that performs a reduce would override {@code doLeaf()} |
|
54 * to perform a reduction on that leaf node's chunk using the |
|
55 * {@code Spliterator}, and override {@code onCompletion()} to merge the results |
|
56 * of the child tasks for internal nodes: |
|
57 * |
|
58 * <pre>{@code |
|
59 * protected S doLeaf() { |
|
60 * spliterator.forEach(...); |
|
61 * return localReductionResult; |
|
62 * } |
|
63 * |
|
64 * public void onCompletion(CountedCompleter caller) { |
|
65 * if (!isLeaf()) { |
|
66 * ReduceTask<P_IN, P_OUT, T, R> child = children; |
|
67 * R result = child.getLocalResult(); |
|
68 * child = child.nextSibling; |
|
69 * for (; child != null; child = child.nextSibling) |
|
70 * result = combine(result, child.getLocalResult()); |
|
71 * setLocalResult(result); |
|
72 * } |
|
73 * } |
|
74 * }</pre> |
|
75 * |
|
76 * @param <P_IN> Type of elements input to the pipeline |
|
77 * @param <P_OUT> Type of elements output from the pipeline |
|
78 * @param <R> Type of intermediate result, which may be different from operation |
|
79 * result type |
|
80 * @param <K> Type of parent, child and sibling tasks |
|
81 * @since 1.8 |
|
82 */ |
|
83 abstract class AbstractTask<P_IN, P_OUT, R, |
|
84 K extends AbstractTask<P_IN, P_OUT, R, K>> |
|
85 extends CountedCompleter<R> { |
|
86 |
|
87 /** |
|
88 * Default target factor of leaf tasks for parallel decomposition. |
|
89 * To allow load balancing, we over-partition, currently to approximately |
|
90 * four tasks per processor, which enables others to help out |
|
91 * if leaf tasks are uneven or some processors are otherwise busy. |
|
92 */ |
|
93 static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; |
|
94 |
|
95 /** The pipeline helper, common to all tasks in a computation */ |
|
96 protected final PipelineHelper<P_OUT> helper; |
|
97 |
|
98 /** |
|
99 * The spliterator for the portion of the input associated with the subtree |
|
100 * rooted at this task |
|
101 */ |
|
102 protected Spliterator<P_IN> spliterator; |
|
103 |
|
104 /** Target leaf size, common to all tasks in a computation */ |
|
105 protected final long targetSize; |
|
106 |
|
107 /** |
|
108 * The left child. |
|
109 * null if no children |
|
110 * if non-null rightChild is non-null |
|
111 */ |
|
112 protected K leftChild; |
|
113 |
|
114 /** |
|
115 * The right child. |
|
116 * null if no children |
|
117 * if non-null leftChild is non-null |
|
118 */ |
|
119 protected K rightChild; |
|
120 |
|
121 /** The result of this node, if completed */ |
|
122 private R localResult; |
|
123 |
|
124 /** |
|
125 * Constructor for root nodes. |
|
126 * |
|
127 * @param helper The {@code PipelineHelper} describing the stream pipeline |
|
128 * up to this operation |
|
129 * @param spliterator The {@code Spliterator} describing the source for this |
|
130 * pipeline |
|
131 */ |
|
132 protected AbstractTask(PipelineHelper<P_OUT> helper, |
|
133 Spliterator<P_IN> spliterator) { |
|
134 super(null); |
|
135 this.helper = helper; |
|
136 this.spliterator = spliterator; |
|
137 this.targetSize = suggestTargetSize(spliterator.estimateSize()); |
|
138 } |
|
139 |
|
140 /** |
|
141 * Constructor for non-root nodes. |
|
142 * |
|
143 * @param parent this node's parent task |
|
144 * @param spliterator {@code Spliterator} describing the subtree rooted at |
|
145 * this node, obtained by splitting the parent {@code Spliterator} |
|
146 */ |
|
147 protected AbstractTask(K parent, |
|
148 Spliterator<P_IN> spliterator) { |
|
149 super(parent); |
|
150 this.spliterator = spliterator; |
|
151 this.helper = parent.helper; |
|
152 this.targetSize = parent.targetSize; |
|
153 } |
|
154 |
|
155 /** |
|
156 * Constructs a new node of type T whose parent is the receiver; must call |
|
157 * the AbstractTask(T, Spliterator) constructor with the receiver and the |
|
158 * provided Spliterator. |
|
159 * |
|
160 * @param spliterator {@code Spliterator} describing the subtree rooted at |
|
161 * this node, obtained by splitting the parent {@code Spliterator} |
|
162 * @return newly constructed child node |
|
163 */ |
|
164 protected abstract K makeChild(Spliterator<P_IN> spliterator); |
|
165 |
|
166 /** |
|
167 * Computes the result associated with a leaf node. Will be called by |
|
168 * {@code compute()} and the result passed to @{code setLocalResult()} |
|
169 * |
|
170 * @return the computed result of a leaf node |
|
171 */ |
|
172 protected abstract R doLeaf(); |
|
173 |
|
174 /** |
|
175 * Returns a suggested target leaf size based on the initial size estimate. |
|
176 * |
|
177 * @return suggested target leaf size |
|
178 */ |
|
179 public static long suggestTargetSize(long sizeEstimate) { |
|
180 long est = sizeEstimate / LEAF_TARGET; |
|
181 return est > 0L ? est : 1L; |
|
182 } |
|
183 |
|
184 /** |
|
185 * Returns a suggestion whether it is advisable to split the provided |
|
186 * spliterator based on target size and other considerations, such as pool |
|
187 * state. |
|
188 * |
|
189 * @return {@code true} if a split is advised otherwise {@code false} |
|
190 */ |
|
191 public static boolean suggestSplit(Spliterator spliterator, |
|
192 long targetSize) { |
|
193 long remaining = spliterator.estimateSize(); |
|
194 return (remaining > targetSize); |
|
195 // @@@ May additionally want to fold in pool characteristics such as surplus task count |
|
196 } |
|
197 |
|
198 /** |
|
199 * Returns a suggestion whether it is adviseable to split this task based on |
|
200 * target size and other considerations. |
|
201 * |
|
202 * @return {@code true} if a split is advised otherwise {@code false} |
|
203 */ |
|
204 public boolean suggestSplit() { |
|
205 return suggestSplit(spliterator, targetSize); |
|
206 } |
|
207 |
|
208 /** |
|
209 * Returns the local result, if any. Subclasses should use |
|
210 * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage |
|
211 * results. This returns the local result so that calls from within the |
|
212 * fork-join framework will return the correct result. |
|
213 * |
|
214 * @return local result for this node previously stored with |
|
215 * {@link #setLocalResult} |
|
216 */ |
|
217 @Override |
|
218 public R getRawResult() { |
|
219 return localResult; |
|
220 } |
|
221 |
|
222 /** |
|
223 * Does nothing; instead, subclasses should use |
|
224 * {@link #setLocalResult(Object)}} to manage results. |
|
225 * |
|
226 * @param result must be null, or an exception is thrown (this is a safety |
|
227 * tripwire to detect when {@code setRawResult()} is being used |
|
228 * instead of {@code setLocalResult()} |
|
229 */ |
|
230 @Override |
|
231 protected void setRawResult(R result) { |
|
232 if (result != null) |
|
233 throw new IllegalStateException(); |
|
234 } |
|
235 |
|
236 /** |
|
237 * Retrieves a result previously stored with {@link #setLocalResult} |
|
238 * |
|
239 * @return local result for this node previously stored with |
|
240 * {@link #setLocalResult} |
|
241 */ |
|
242 protected R getLocalResult() { |
|
243 return localResult; |
|
244 } |
|
245 |
|
246 /** |
|
247 * Associates the result with the task, can be retrieved with |
|
248 * {@link #getLocalResult} |
|
249 * |
|
250 * @param localResult local result for this node |
|
251 */ |
|
252 protected void setLocalResult(R localResult) { |
|
253 this.localResult = localResult; |
|
254 } |
|
255 |
|
256 /** |
|
257 * Indicates whether this task is a leaf node. (Only valid after |
|
258 * {@link #compute} has been called on this node). If the node is not a |
|
259 * leaf node, then children will be non-null and numChildren will be |
|
260 * positive. |
|
261 * |
|
262 * @return {@code true} if this task is a leaf node |
|
263 */ |
|
264 protected boolean isLeaf() { |
|
265 return leftChild == null; |
|
266 } |
|
267 |
|
268 /** |
|
269 * Indicates whether this task is the root node |
|
270 * |
|
271 * @return {@code true} if this task is the root node. |
|
272 */ |
|
273 protected boolean isRoot() { |
|
274 return getParent() == null; |
|
275 } |
|
276 |
|
277 /** |
|
278 * Returns the parent of this task, or null if this task is the root |
|
279 * |
|
280 * @return the parent of this task, or null if this task is the root |
|
281 */ |
|
282 @SuppressWarnings("unchecked") |
|
283 protected K getParent() { |
|
284 return (K) getCompleter(); |
|
285 } |
|
286 |
|
287 /** |
|
288 * Decides whether or not to split a task further or compute it directly. If |
|
289 * computing directly, call {@code doLeaf} and pass the result to |
|
290 * {@code setRawResult}. If splitting, set up the child-related fields, |
|
291 * create the child tasks, fork the leftmost (prefix) child tasks, and |
|
292 * compute the rightmost (remaining) child tasks. |
|
293 * |
|
294 * <p> |
|
295 * Computing will continue for rightmost tasks while a task can be computed |
|
296 * as determined by {@link #canCompute()} and that task should and can be |
|
297 * split into left and right tasks. |
|
298 * |
|
299 * <p> |
|
300 * The rightmost tasks are computed in a loop rather than recursively to |
|
301 * avoid potential stack overflows when computing with a right-balanced |
|
302 * tree, such as that produced when splitting with a {@link Spliterator} |
|
303 * created from an {@link java.util.Iterator}. |
|
304 */ |
|
305 @Override |
|
306 public final void compute() { |
|
307 @SuppressWarnings("unchecked") |
|
308 K task = (K) this; |
|
309 while (task.canCompute()) { |
|
310 Spliterator<P_IN> split; |
|
311 if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { |
|
312 task.setLocalResult(task.doLeaf()); |
|
313 task.tryComplete(); |
|
314 return; |
|
315 } |
|
316 else { |
|
317 K l = task.leftChild = task.makeChild(split); |
|
318 K r = task.rightChild = task.makeChild(task.spliterator); |
|
319 task.setPendingCount(1); |
|
320 l.fork(); |
|
321 task = r; |
|
322 } |
|
323 } |
|
324 } |
|
325 |
|
326 /** |
|
327 * {@inheritDoc} |
|
328 * |
|
329 * @implNote |
|
330 * Clears spliterator and children fields. Overriders MUST call |
|
331 * {@code super.onCompletion} as the last thing they do if they want these |
|
332 * cleared. |
|
333 */ |
|
334 @Override |
|
335 public void onCompletion(CountedCompleter<?> caller) { |
|
336 spliterator = null; |
|
337 leftChild = rightChild = null; |
|
338 } |
|
339 |
|
340 /** |
|
341 * Determines if the task can be computed. |
|
342 * |
|
343 * @implSpec The default always returns true |
|
344 * |
|
345 * @return {@code true} if this task can be computed to either calculate the |
|
346 * leaf via {@link #doLeaf()} or split, otherwise false if this task |
|
347 * cannot be computed, for example if this task has been canceled |
|
348 * and/or a result for the computation has been found by another |
|
349 * task. |
|
350 */ |
|
351 protected boolean canCompute() { |
|
352 return true; |
|
353 } |
|
354 |
|
355 /** |
|
356 * Returns whether this node is a "leftmost" node -- whether the path from |
|
357 * the root to this node involves only traversing leftmost child links. For |
|
358 * a leaf node, this means it is the first leaf node in the encounter order. |
|
359 * |
|
360 * @return {@code true} if this node is a "leftmost" node |
|
361 */ |
|
362 protected boolean isLeftmostNode() { |
|
363 @SuppressWarnings("unchecked") |
|
364 K node = (K) this; |
|
365 while (node != null) { |
|
366 K parent = node.getParent(); |
|
367 if (parent != null && parent.leftChild != node) |
|
368 return false; |
|
369 node = parent; |
|
370 } |
|
371 return true; |
|
372 } |
|
373 } |