author | chegar |
Wed, 07 Feb 2018 21:45:37 +0000 | |
branch | http-client-branch |
changeset 56092 | fd85b2bf2b0d |
parent 56089 | src/java.net.http/share/classes/java/net/http/internal/common/SequentialScheduler.java@42208b2f224e |
child 56451 | 9585061fdb04 |
permissions | -rw-r--r-- |
48083 | 1 |
/* |
56040 | 2 |
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. |
48083 | 3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 |
* |
|
5 |
* This code is free software; you can redistribute it and/or modify it |
|
6 |
* under the terms of the GNU General Public License version 2 only, as |
|
7 |
* published by the Free Software Foundation. Oracle designates this |
|
8 |
* particular file as subject to the "Classpath" exception as provided |
|
9 |
* by Oracle in the LICENSE file that accompanied this code. |
|
10 |
* |
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that |
|
15 |
* accompanied this code). |
|
16 |
* |
|
17 |
* You should have received a copy of the GNU General Public License version |
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation, |
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 |
* |
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 |
* or visit www.oracle.com if you need additional information or have any |
|
23 |
* questions. |
|
24 |
*/ |
|
25 |
||
56092
fd85b2bf2b0d
http-client-branch: move implementation to jdk.internal.net.http
chegar
parents:
56089
diff
changeset
|
26 |
package jdk.internal.net.http.common; |
48083 | 27 |
|
28 |
import java.util.concurrent.Executor; |
|
29 |
import java.util.concurrent.atomic.AtomicInteger; |
|
30 |
||
31 |
import static java.util.Objects.requireNonNull; |
|
32 |
||
33 |
/** |
|
34 |
* A scheduler of ( repeatable ) tasks that MUST be run sequentially. |
|
35 |
* |
|
36 |
* <p> This class can be used as a synchronization aid that assists a number of |
|
37 |
* parties in running a task in a mutually exclusive fashion. |
|
38 |
* |
|
39 |
* <p> To run the task, a party invokes {@code runOrSchedule}. To permanently |
|
40 |
* prevent the task from subsequent runs, the party invokes {@code stop}. |
|
41 |
* |
|
42 |
* <p> The parties can, but do not have to, operate in different threads. |
|
43 |
* |
|
44 |
* <p> The task can be either synchronous ( completes when its {@code run} |
|
45 |
* method returns ), or asynchronous ( completed when its |
|
46 |
* {@code DeferredCompleter} is explicitly completed ). |
|
47 |
* |
|
48 |
* <p> The next run of the task will not begin until the previous run has |
|
49 |
* finished. |
|
50 |
* |
|
51 |
* <p> The task may invoke {@code runOrSchedule} itself, which may be a normal |
|
52 |
* situation. |
|
53 |
*/ |
|
54 |
public final class SequentialScheduler { |
|
55 |
||
56 |
/* |
|
57 |
Since the task is fixed and known beforehand, no blocking synchronization |
|
58 |
(locks, queues, etc.) is required. The job can be done solely using |
|
59 |
nonblocking primitives. |
|
60 |
||
61 |
The machinery below addresses two problems: |
|
62 |
||
63 |
1. Running the task in a sequential order (no concurrent runs): |
|
64 |
||
65 |
begin, end, begin, end... |
|
66 |
||
67 |
2. Avoiding indefinite recursion: |
|
68 |
||
69 |
begin |
|
70 |
end |
|
71 |
begin |
|
72 |
end |
|
73 |
... |
|
74 |
||
75 |
Problem #1 is solved with a finite state machine with 4 states: |
|
76 |
||
77 |
BEGIN, AGAIN, END, and STOP. |
|
78 |
||
79 |
Problem #2 is solved with a "state modifier" OFFLOAD. |
|
80 |
||
81 |
Parties invoke `runOrSchedule()` to signal the task must run. A party |
|
82 |
that has invoked `runOrSchedule()` either begins the task or exploits the |
|
83 |
party that is either beginning the task or ending it. |
|
84 |
||
85 |
The party that is trying to end the task either ends it or begins it |
|
86 |
again. |
|
87 |
||
88 |
To avoid indefinite recursion, before re-running the task the |
|
89 |
TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child" |
|
90 |
TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is |
|
91 |
available and the "child" must offload the task on to the "parent". Then |
|
92 |
a race begins. Whichever invocation of TryEndDeferredCompleter.complete |
|
93 |
manages to unset OFFLOAD bit first does not do the work. |
|
94 |
||
95 |
There is at most 1 thread that is beginning the task and at most 2 |
|
96 |
threads that are trying to end it: "parent" and "child". In case of a |
|
97 |
synchronous task "parent" and "child" are the same thread. |
|
98 |
*/ |
|
99 |
||
100 |
/** |
|
101 |
* An interface to signal the completion of a {@link RestartableTask}. |
|
102 |
* |
|
103 |
* <p> The invocation of {@code complete} completes the task. The invocation |
|
104 |
* of {@code complete} may restart the task, if an attempt has previously |
|
105 |
* been made to run the task while it was already running. |
|
106 |
* |
|
107 |
* @apiNote {@code DeferredCompleter} is useful when a task is not necessary |
|
108 |
* complete when its {@code run} method returns, but will complete at a |
|
109 |
* later time, and maybe in different thread. This type exists for |
|
110 |
* readability purposes at use-sites only. |
|
111 |
*/ |
|
112 |
public static abstract class DeferredCompleter { |
|
113 |
||
114 |
/** Extensible from this (outer) class ONLY. */ |
|
115 |
private DeferredCompleter() { } |
|
116 |
||
117 |
/** Completes the task. Must be called once, and once only. */ |
|
118 |
public abstract void complete(); |
|
119 |
} |
|
120 |
||
121 |
/** |
|
122 |
* A restartable task. |
|
123 |
*/ |
|
124 |
@FunctionalInterface |
|
125 |
public interface RestartableTask { |
|
126 |
||
127 |
/** |
|
128 |
* The body of the task. |
|
129 |
* |
|
130 |
* @param taskCompleter |
|
131 |
* A completer that must be invoked once, and only once, |
|
132 |
* when this task is logically finished |
|
133 |
*/ |
|
134 |
void run(DeferredCompleter taskCompleter); |
|
135 |
} |
|
136 |
||
137 |
/** |
|
56040 | 138 |
* A simple and self-contained task that completes once its {@code run} |
139 |
* method returns. |
|
48083 | 140 |
*/ |
141 |
public static abstract class CompleteRestartableTask |
|
142 |
implements RestartableTask |
|
143 |
{ |
|
144 |
@Override |
|
145 |
public final void run(DeferredCompleter taskCompleter) { |
|
146 |
try { |
|
147 |
run(); |
|
148 |
} finally { |
|
149 |
taskCompleter.complete(); |
|
150 |
} |
|
151 |
} |
|
152 |
||
153 |
/** The body of the task. */ |
|
154 |
protected abstract void run(); |
|
155 |
} |
|
156 |
||
157 |
/** |
|
56040 | 158 |
* A task that runs its main loop within a synchronized block to provide |
159 |
* memory visibility between runs. Since the main loop can't run concurrently, |
|
160 |
* the lock shouldn't be contended and no deadlock should ever be possible. |
|
48083 | 161 |
*/ |
162 |
public static final class SynchronizedRestartableTask |
|
163 |
extends CompleteRestartableTask { |
|
56040 | 164 |
|
48083 | 165 |
private final Runnable mainLoop; |
166 |
private final Object lock = new Object(); |
|
56040 | 167 |
|
48083 | 168 |
public SynchronizedRestartableTask(Runnable mainLoop) { |
169 |
this.mainLoop = mainLoop; |
|
170 |
} |
|
171 |
||
172 |
@Override |
|
173 |
protected void run() { |
|
174 |
synchronized(lock) { |
|
175 |
mainLoop.run(); |
|
176 |
} |
|
177 |
} |
|
178 |
} |
|
179 |
||
180 |
private static final int OFFLOAD = 1; |
|
181 |
private static final int AGAIN = 2; |
|
182 |
private static final int BEGIN = 4; |
|
183 |
private static final int STOP = 8; |
|
184 |
private static final int END = 16; |
|
185 |
||
186 |
private final AtomicInteger state = new AtomicInteger(END); |
|
187 |
private final RestartableTask restartableTask; |
|
188 |
private final DeferredCompleter completer; |
|
189 |
private final SchedulableTask schedulableTask; |
|
190 |
||
191 |
/** |
|
56040 | 192 |
* An auxiliary task that starts the restartable task: |
48083 | 193 |
* {@code restartableTask.run(completer)}. |
194 |
*/ |
|
195 |
private final class SchedulableTask implements Runnable { |
|
196 |
@Override |
|
197 |
public void run() { |
|
198 |
restartableTask.run(completer); |
|
199 |
} |
|
200 |
} |
|
201 |
||
202 |
public SequentialScheduler(RestartableTask restartableTask) { |
|
203 |
this.restartableTask = requireNonNull(restartableTask); |
|
204 |
this.completer = new TryEndDeferredCompleter(); |
|
205 |
this.schedulableTask = new SchedulableTask(); |
|
206 |
} |
|
207 |
||
208 |
/** |
|
209 |
* Runs or schedules the task to be run. |
|
210 |
* |
|
211 |
* @implSpec The recursion which is possible here must be bounded: |
|
212 |
* |
|
213 |
* <pre>{@code |
|
214 |
* this.runOrSchedule() |
|
215 |
* completer.complete() |
|
216 |
* this.runOrSchedule() |
|
217 |
* ... |
|
218 |
* }</pre> |
|
219 |
* |
|
220 |
* @implNote The recursion in this implementation has the maximum |
|
221 |
* depth of 1. |
|
222 |
*/ |
|
223 |
public void runOrSchedule() { |
|
224 |
runOrSchedule(schedulableTask, null); |
|
225 |
} |
|
226 |
||
227 |
/** |
|
56040 | 228 |
* Executes or schedules the task to be executed in the provided executor. |
48083 | 229 |
* |
230 |
* <p> This method can be used when potential executing from a calling |
|
231 |
* thread is not desirable. |
|
232 |
* |
|
233 |
* @param executor |
|
234 |
* An executor in which to execute the task, if the task needs |
|
235 |
* to be executed. |
|
236 |
* |
|
237 |
* @apiNote The given executor can be {@code null} in which case calling |
|
56040 | 238 |
* {@code runOrSchedule(null)} is strictly equivalent to calling |
48083 | 239 |
* {@code runOrSchedule()}. |
240 |
*/ |
|
56040 | 241 |
public void runOrSchedule(Executor executor) { |
48083 | 242 |
runOrSchedule(schedulableTask, executor); |
243 |
} |
|
244 |
||
245 |
private void runOrSchedule(SchedulableTask task, Executor executor) { |
|
246 |
while (true) { |
|
247 |
int s = state.get(); |
|
248 |
if (s == END) { |
|
249 |
if (state.compareAndSet(END, BEGIN)) { |
|
250 |
break; |
|
251 |
} |
|
252 |
} else if ((s & BEGIN) != 0) { |
|
253 |
// Tries to change the state to AGAIN, preserving OFFLOAD bit |
|
254 |
if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) { |
|
255 |
return; |
|
256 |
} |
|
257 |
} else if ((s & AGAIN) != 0 || s == STOP) { |
|
258 |
/* In the case of AGAIN the scheduler does not provide |
|
259 |
happens-before relationship between actions prior to |
|
260 |
runOrSchedule() and actions that happen in task.run(). |
|
261 |
The reason is that no volatile write is done in this case, |
|
262 |
and the call piggybacks on the call that has actually set |
|
263 |
AGAIN state. */ |
|
264 |
return; |
|
265 |
} else { |
|
266 |
// Non-existent state, or the one that cannot be offloaded |
|
267 |
throw new InternalError(String.valueOf(s)); |
|
268 |
} |
|
269 |
} |
|
270 |
if (executor == null) { |
|
271 |
task.run(); |
|
272 |
} else { |
|
273 |
executor.execute(task); |
|
274 |
} |
|
275 |
} |
|
276 |
||
277 |
/** The only concrete {@code DeferredCompleter} implementation. */ |
|
278 |
private class TryEndDeferredCompleter extends DeferredCompleter { |
|
279 |
||
280 |
@Override |
|
281 |
public void complete() { |
|
282 |
while (true) { |
|
283 |
int s; |
|
284 |
while (((s = state.get()) & OFFLOAD) != 0) { |
|
285 |
// Tries to offload ending of the task to the parent |
|
286 |
if (state.compareAndSet(s, s & ~OFFLOAD)) { |
|
287 |
return; |
|
288 |
} |
|
289 |
} |
|
290 |
while (true) { |
|
291 |
if ((s & OFFLOAD) != 0) { |
|
292 |
/* OFFLOAD bit can never be observed here. Otherwise |
|
293 |
it would mean there is another invocation of |
|
294 |
"complete" that can run the task. */ |
|
295 |
throw new InternalError(String.valueOf(s)); |
|
296 |
} |
|
297 |
if (s == BEGIN) { |
|
298 |
if (state.compareAndSet(BEGIN, END)) { |
|
299 |
return; |
|
300 |
} |
|
301 |
} else if (s == AGAIN) { |
|
302 |
if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) { |
|
303 |
break; |
|
304 |
} |
|
305 |
} else if (s == STOP) { |
|
306 |
return; |
|
307 |
} else if (s == END) { |
|
308 |
throw new IllegalStateException("Duplicate completion"); |
|
309 |
} else { |
|
310 |
// Non-existent state |
|
311 |
throw new InternalError(String.valueOf(s)); |
|
312 |
} |
|
313 |
s = state.get(); |
|
314 |
} |
|
315 |
restartableTask.run(completer); |
|
316 |
} |
|
317 |
} |
|
318 |
} |
|
319 |
||
320 |
/** |
|
321 |
* Tells whether, or not, this scheduler has been permanently stopped. |
|
322 |
* |
|
323 |
* <p> Should be used from inside the task to poll the status of the |
|
324 |
* scheduler, pretty much the same way as it is done for threads: |
|
325 |
* <pre>{@code |
|
326 |
* if (!Thread.currentThread().isInterrupted()) { |
|
327 |
* ... |
|
328 |
* } |
|
329 |
* }</pre> |
|
330 |
*/ |
|
331 |
public boolean isStopped() { |
|
332 |
return state.get() == STOP; |
|
333 |
} |
|
334 |
||
335 |
/** |
|
336 |
* Stops this scheduler. Subsequent invocations of {@code runOrSchedule} |
|
337 |
* are effectively no-ops. |
|
338 |
* |
|
339 |
* <p> If the task has already begun, this invocation will not affect it, |
|
340 |
* unless the task itself uses {@code isStopped()} method to check the state |
|
341 |
* of the handler. |
|
342 |
*/ |
|
343 |
public void stop() { |
|
344 |
state.set(STOP); |
|
345 |
} |
|
346 |
||
347 |
/** |
|
348 |
* Returns a new {@code SequentialScheduler} that executes the provided |
|
349 |
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}. |
|
350 |
* |
|
56040 | 351 |
* @apiNote This is equivalent to calling |
352 |
* {@code new SequentialScheduler(new SynchronizedRestartableTask(mainLoop))} |
|
353 |
* The main loop must not perform any blocking operation. |
|
48083 | 354 |
* |
56040 | 355 |
* @param mainLoop The main loop of the new sequential scheduler |
48083 | 356 |
* @return a new {@code SequentialScheduler} that executes the provided |
357 |
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}. |
|
358 |
*/ |
|
56040 | 359 |
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) { |
360 |
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop)); |
|
48083 | 361 |
} |
362 |
} |