|
1 /* |
|
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 package java.util.stream; |
|
26 |
|
27 import java.util.Spliterator; |
|
28 import java.util.concurrent.CountedCompleter; |
|
29 import java.util.concurrent.ForkJoinPool; |
|
30 |
|
31 /** |
|
32 * Abstract base class for most fork-join tasks used to implement stream ops. |
|
33 * Manages splitting logic, tracking of child tasks, and intermediate results. |
|
34 * Each task is associated with a {@link Spliterator} that describes the portion |
|
35 * of the input associated with the subtree rooted at this task. |
|
36 * Tasks may be leaf nodes (which will traverse the elements of |
|
37 * the {@code Spliterator}) or internal nodes (which split the |
|
38 * {@code Spliterator} into multiple child tasks). |
|
39 * |
|
40 * @implNote |
|
41 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task |
|
42 * where each task has a semaphore-like count of uncompleted children, and the |
|
43 * task is implicitly completed and notified when its last child completes. |
|
44 * Internal node tasks will likely override the {@code onCompletion} method from |
|
45 * {@code CountedCompleter} to merge the results from child tasks into the |
|
46 * current task's result. |
|
47 * |
|
48 * <p>Splitting and setting up the child task links is done by {@code compute()} |
|
49 * for internal nodes. At {@code compute()} time for leaf nodes, it is |
|
50 * guaranteed that the parent's child-related fields (including sibling links |
|
51 * for the parent's children) will be set up for all children. |
|
52 * |
|
53 * <p>For example, a task that performs a reduce would override {@code doLeaf()} |
|
54 * to perform a reduction on that leaf node's chunk using the |
|
55 * {@code Spliterator}, and override {@code onCompletion()} to merge the results |
|
56 * of the child tasks for internal nodes: |
|
57 * |
|
58 * <pre>{@code |
|
59 * protected S doLeaf() { |
|
60 * spliterator.forEach(...); |
|
61 * return localReductionResult; |
|
62 * } |
|
63 * |
|
64 * public void onCompletion(CountedCompleter caller) { |
|
65 * if (!isLeaf()) { |
|
66 * ReduceTask<P_IN, P_OUT, T, R> child = children; |
|
67 * R result = child.getLocalResult(); |
|
68 * child = child.nextSibling; |
|
69 * for (; child != null; child = child.nextSibling) |
|
70 * result = combine(result, child.getLocalResult()); |
|
71 * setLocalResult(result); |
|
72 * } |
|
73 * } |
|
74 * }</pre> |
|
75 * |
|
76 * <p>Serialization is not supported as there is no intention to serialize |
|
77 * tasks managed by stream ops. |
|
78 * |
|
79 * @param <P_IN> Type of elements input to the pipeline |
|
80 * @param <P_OUT> Type of elements output from the pipeline |
|
81 * @param <R> Type of intermediate result, which may be different from operation |
|
82 * result type |
|
83 * @param <K> Type of parent, child and sibling tasks |
|
84 * @since 1.8 |
|
85 */ |
|
86 @SuppressWarnings("serial") |
|
87 abstract class AbstractTask<P_IN, P_OUT, R, |
|
88 K extends AbstractTask<P_IN, P_OUT, R, K>> |
|
89 extends CountedCompleter<R> { |
|
90 |
|
91 /** |
|
92 * Default target factor of leaf tasks for parallel decomposition. |
|
93 * To allow load balancing, we over-partition, currently to approximately |
|
94 * four tasks per processor, which enables others to help out |
|
95 * if leaf tasks are uneven or some processors are otherwise busy. |
|
96 */ |
|
97 static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; |
|
98 |
|
99 /** The pipeline helper, common to all tasks in a computation */ |
|
100 protected final PipelineHelper<P_OUT> helper; |
|
101 |
|
102 /** |
|
103 * The spliterator for the portion of the input associated with the subtree |
|
104 * rooted at this task |
|
105 */ |
|
106 protected Spliterator<P_IN> spliterator; |
|
107 |
|
108 /** Target leaf size, common to all tasks in a computation */ |
|
109 protected long targetSize; // may be lazily initialized |
|
110 |
|
111 /** |
|
112 * The left child. |
|
113 * null if no children |
|
114 * if non-null rightChild is non-null |
|
115 */ |
|
116 protected K leftChild; |
|
117 |
|
118 /** |
|
119 * The right child. |
|
120 * null if no children |
|
121 * if non-null leftChild is non-null |
|
122 */ |
|
123 protected K rightChild; |
|
124 |
|
125 /** The result of this node, if completed */ |
|
126 private R localResult; |
|
127 |
|
128 /** |
|
129 * Constructor for root nodes. |
|
130 * |
|
131 * @param helper The {@code PipelineHelper} describing the stream pipeline |
|
132 * up to this operation |
|
133 * @param spliterator The {@code Spliterator} describing the source for this |
|
134 * pipeline |
|
135 */ |
|
136 protected AbstractTask(PipelineHelper<P_OUT> helper, |
|
137 Spliterator<P_IN> spliterator) { |
|
138 super(null); |
|
139 this.helper = helper; |
|
140 this.spliterator = spliterator; |
|
141 this.targetSize = 0L; |
|
142 } |
|
143 |
|
144 /** |
|
145 * Constructor for non-root nodes. |
|
146 * |
|
147 * @param parent this node's parent task |
|
148 * @param spliterator {@code Spliterator} describing the subtree rooted at |
|
149 * this node, obtained by splitting the parent {@code Spliterator} |
|
150 */ |
|
151 protected AbstractTask(K parent, |
|
152 Spliterator<P_IN> spliterator) { |
|
153 super(parent); |
|
154 this.spliterator = spliterator; |
|
155 this.helper = parent.helper; |
|
156 this.targetSize = parent.targetSize; |
|
157 } |
|
158 |
|
159 /** |
|
160 * Constructs a new node of type T whose parent is the receiver; must call |
|
161 * the AbstractTask(T, Spliterator) constructor with the receiver and the |
|
162 * provided Spliterator. |
|
163 * |
|
164 * @param spliterator {@code Spliterator} describing the subtree rooted at |
|
165 * this node, obtained by splitting the parent {@code Spliterator} |
|
166 * @return newly constructed child node |
|
167 */ |
|
168 protected abstract K makeChild(Spliterator<P_IN> spliterator); |
|
169 |
|
170 /** |
|
171 * Computes the result associated with a leaf node. Will be called by |
|
172 * {@code compute()} and the result passed to @{code setLocalResult()} |
|
173 * |
|
174 * @return the computed result of a leaf node |
|
175 */ |
|
176 protected abstract R doLeaf(); |
|
177 |
|
178 /** |
|
179 * Returns a suggested target leaf size based on the initial size estimate. |
|
180 * |
|
181 * @return suggested target leaf size |
|
182 */ |
|
183 public static long suggestTargetSize(long sizeEstimate) { |
|
184 long est = sizeEstimate / LEAF_TARGET; |
|
185 return est > 0L ? est : 1L; |
|
186 } |
|
187 |
|
188 /** |
|
189 * Returns the targetSize, initializing it via the supplied |
|
190 * size estimate if not already initialized. |
|
191 */ |
|
192 protected final long getTargetSize(long sizeEstimate) { |
|
193 long s; |
|
194 return ((s = targetSize) != 0 ? s : |
|
195 (targetSize = suggestTargetSize(sizeEstimate))); |
|
196 } |
|
197 |
|
198 /** |
|
199 * Returns the local result, if any. Subclasses should use |
|
200 * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage |
|
201 * results. This returns the local result so that calls from within the |
|
202 * fork-join framework will return the correct result. |
|
203 * |
|
204 * @return local result for this node previously stored with |
|
205 * {@link #setLocalResult} |
|
206 */ |
|
207 @Override |
|
208 public R getRawResult() { |
|
209 return localResult; |
|
210 } |
|
211 |
|
212 /** |
|
213 * Does nothing; instead, subclasses should use |
|
214 * {@link #setLocalResult(Object)}} to manage results. |
|
215 * |
|
216 * @param result must be null, or an exception is thrown (this is a safety |
|
217 * tripwire to detect when {@code setRawResult()} is being used |
|
218 * instead of {@code setLocalResult()} |
|
219 */ |
|
220 @Override |
|
221 protected void setRawResult(R result) { |
|
222 if (result != null) |
|
223 throw new IllegalStateException(); |
|
224 } |
|
225 |
|
226 /** |
|
227 * Retrieves a result previously stored with {@link #setLocalResult} |
|
228 * |
|
229 * @return local result for this node previously stored with |
|
230 * {@link #setLocalResult} |
|
231 */ |
|
232 protected R getLocalResult() { |
|
233 return localResult; |
|
234 } |
|
235 |
|
236 /** |
|
237 * Associates the result with the task, can be retrieved with |
|
238 * {@link #getLocalResult} |
|
239 * |
|
240 * @param localResult local result for this node |
|
241 */ |
|
242 protected void setLocalResult(R localResult) { |
|
243 this.localResult = localResult; |
|
244 } |
|
245 |
|
246 /** |
|
247 * Indicates whether this task is a leaf node. (Only valid after |
|
248 * {@link #compute} has been called on this node). If the node is not a |
|
249 * leaf node, then children will be non-null and numChildren will be |
|
250 * positive. |
|
251 * |
|
252 * @return {@code true} if this task is a leaf node |
|
253 */ |
|
254 protected boolean isLeaf() { |
|
255 return leftChild == null; |
|
256 } |
|
257 |
|
258 /** |
|
259 * Indicates whether this task is the root node |
|
260 * |
|
261 * @return {@code true} if this task is the root node. |
|
262 */ |
|
263 protected boolean isRoot() { |
|
264 return getParent() == null; |
|
265 } |
|
266 |
|
267 /** |
|
268 * Returns the parent of this task, or null if this task is the root |
|
269 * |
|
270 * @return the parent of this task, or null if this task is the root |
|
271 */ |
|
272 @SuppressWarnings("unchecked") |
|
273 protected K getParent() { |
|
274 return (K) getCompleter(); |
|
275 } |
|
276 |
|
277 /** |
|
278 * Decides whether or not to split a task further or compute it |
|
279 * directly. If computing directly, calls {@code doLeaf} and pass |
|
280 * the result to {@code setRawResult}. Otherwise splits off |
|
281 * subtasks, forking one and continuing as the other. |
|
282 * |
|
283 * <p> The method is structured to conserve resources across a |
|
284 * range of uses. The loop continues with one of the child tasks |
|
285 * when split, to avoid deep recursion. To cope with spliterators |
|
286 * that may be systematically biased toward left-heavy or |
|
287 * right-heavy splits, we alternate which child is forked versus |
|
288 * continued in the loop. |
|
289 */ |
|
290 @Override |
|
291 public void compute() { |
|
292 Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators |
|
293 long sizeEstimate = rs.estimateSize(); |
|
294 long sizeThreshold = getTargetSize(sizeEstimate); |
|
295 boolean forkRight = false; |
|
296 @SuppressWarnings("unchecked") K task = (K) this; |
|
297 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { |
|
298 K leftChild, rightChild, taskToFork; |
|
299 task.leftChild = leftChild = task.makeChild(ls); |
|
300 task.rightChild = rightChild = task.makeChild(rs); |
|
301 task.setPendingCount(1); |
|
302 if (forkRight) { |
|
303 forkRight = false; |
|
304 rs = ls; |
|
305 task = leftChild; |
|
306 taskToFork = rightChild; |
|
307 } |
|
308 else { |
|
309 forkRight = true; |
|
310 task = rightChild; |
|
311 taskToFork = leftChild; |
|
312 } |
|
313 taskToFork.fork(); |
|
314 sizeEstimate = rs.estimateSize(); |
|
315 } |
|
316 task.setLocalResult(task.doLeaf()); |
|
317 task.tryComplete(); |
|
318 } |
|
319 |
|
320 /** |
|
321 * {@inheritDoc} |
|
322 * |
|
323 * @implNote |
|
324 * Clears spliterator and children fields. Overriders MUST call |
|
325 * {@code super.onCompletion} as the last thing they do if they want these |
|
326 * cleared. |
|
327 */ |
|
328 @Override |
|
329 public void onCompletion(CountedCompleter<?> caller) { |
|
330 spliterator = null; |
|
331 leftChild = rightChild = null; |
|
332 } |
|
333 |
|
334 /** |
|
335 * Returns whether this node is a "leftmost" node -- whether the path from |
|
336 * the root to this node involves only traversing leftmost child links. For |
|
337 * a leaf node, this means it is the first leaf node in the encounter order. |
|
338 * |
|
339 * @return {@code true} if this node is a "leftmost" node |
|
340 */ |
|
341 protected boolean isLeftmostNode() { |
|
342 @SuppressWarnings("unchecked") |
|
343 K node = (K) this; |
|
344 while (node != null) { |
|
345 K parent = node.getParent(); |
|
346 if (parent != null && parent.leftChild != node) |
|
347 return false; |
|
348 node = parent; |
|
349 } |
|
350 return true; |
|
351 } |
|
352 } |