48083
|
1 |
/*
|
49765
|
2 |
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
|
48083
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
4 |
*
|
|
5 |
* This code is free software; you can redistribute it and/or modify it
|
|
6 |
* under the terms of the GNU General Public License version 2 only, as
|
|
7 |
* published by the Free Software Foundation. Oracle designates this
|
|
8 |
* particular file as subject to the "Classpath" exception as provided
|
|
9 |
* by Oracle in the LICENSE file that accompanied this code.
|
|
10 |
*
|
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that
|
|
15 |
* accompanied this code).
|
|
16 |
*
|
|
17 |
* You should have received a copy of the GNU General Public License version
|
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
20 |
*
|
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
22 |
* or visit www.oracle.com if you need additional information or have any
|
|
23 |
* questions.
|
|
24 |
*/
|
|
25 |
|
49765
|
26 |
package jdk.internal.net.http.common;
|
48083
|
27 |
|
|
28 |
import java.util.concurrent.Executor;
|
|
29 |
import java.util.concurrent.atomic.AtomicInteger;
|
|
30 |
|
|
31 |
import static java.util.Objects.requireNonNull;
|
|
32 |
|
|
33 |
/**
|
|
34 |
* A scheduler of ( repeatable ) tasks that MUST be run sequentially.
|
|
35 |
*
|
|
36 |
* <p> This class can be used as a synchronization aid that assists a number of
|
|
37 |
* parties in running a task in a mutually exclusive fashion.
|
|
38 |
*
|
|
39 |
* <p> To run the task, a party invokes {@code runOrSchedule}. To permanently
|
|
40 |
* prevent the task from subsequent runs, the party invokes {@code stop}.
|
|
41 |
*
|
|
42 |
* <p> The parties can, but do not have to, operate in different threads.
|
|
43 |
*
|
|
44 |
* <p> The task can be either synchronous ( completes when its {@code run}
|
|
45 |
* method returns ), or asynchronous ( completed when its
|
|
46 |
* {@code DeferredCompleter} is explicitly completed ).
|
|
47 |
*
|
|
48 |
* <p> The next run of the task will not begin until the previous run has
|
|
49 |
* finished.
|
|
50 |
*
|
|
51 |
* <p> The task may invoke {@code runOrSchedule} itself, which may be a normal
|
|
52 |
* situation.
|
|
53 |
*/
|
|
54 |
public final class SequentialScheduler {
|
|
55 |
|
|
56 |
/*
|
|
57 |
Since the task is fixed and known beforehand, no blocking synchronization
|
|
58 |
(locks, queues, etc.) is required. The job can be done solely using
|
|
59 |
nonblocking primitives.
|
|
60 |
|
|
61 |
The machinery below addresses two problems:
|
|
62 |
|
|
63 |
1. Running the task in a sequential order (no concurrent runs):
|
|
64 |
|
|
65 |
begin, end, begin, end...
|
|
66 |
|
|
67 |
2. Avoiding indefinite recursion:
|
|
68 |
|
|
69 |
begin
|
|
70 |
end
|
|
71 |
begin
|
|
72 |
end
|
|
73 |
...
|
|
74 |
|
|
75 |
Problem #1 is solved with a finite state machine with 4 states:
|
|
76 |
|
|
77 |
BEGIN, AGAIN, END, and STOP.
|
|
78 |
|
|
79 |
Problem #2 is solved with a "state modifier" OFFLOAD.
|
|
80 |
|
|
81 |
Parties invoke `runOrSchedule()` to signal the task must run. A party
|
|
82 |
that has invoked `runOrSchedule()` either begins the task or exploits the
|
|
83 |
party that is either beginning the task or ending it.
|
|
84 |
|
|
85 |
The party that is trying to end the task either ends it or begins it
|
|
86 |
again.
|
|
87 |
|
|
88 |
To avoid indefinite recursion, before re-running the task the
|
|
89 |
TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child"
|
|
90 |
TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is
|
|
91 |
available and the "child" must offload the task on to the "parent". Then
|
|
92 |
a race begins. Whichever invocation of TryEndDeferredCompleter.complete
|
|
93 |
manages to unset OFFLOAD bit first does not do the work.
|
|
94 |
|
|
95 |
There is at most 1 thread that is beginning the task and at most 2
|
|
96 |
threads that are trying to end it: "parent" and "child". In case of a
|
|
97 |
synchronous task "parent" and "child" are the same thread.
|
|
98 |
*/
|
|
99 |
|
|
100 |
/**
|
|
101 |
* An interface to signal the completion of a {@link RestartableTask}.
|
|
102 |
*
|
|
103 |
* <p> The invocation of {@code complete} completes the task. The invocation
|
|
104 |
* of {@code complete} may restart the task, if an attempt has previously
|
|
105 |
* been made to run the task while it was already running.
|
|
106 |
*
|
|
107 |
* @apiNote {@code DeferredCompleter} is useful when a task is not necessary
|
|
108 |
* complete when its {@code run} method returns, but will complete at a
|
|
109 |
* later time, and maybe in different thread. This type exists for
|
|
110 |
* readability purposes at use-sites only.
|
|
111 |
*/
|
|
112 |
public static abstract class DeferredCompleter {
|
|
113 |
|
|
114 |
/** Extensible from this (outer) class ONLY. */
|
|
115 |
private DeferredCompleter() { }
|
|
116 |
|
|
117 |
/** Completes the task. Must be called once, and once only. */
|
|
118 |
public abstract void complete();
|
|
119 |
}
|
|
120 |
|
|
121 |
/**
|
|
122 |
* A restartable task.
|
|
123 |
*/
|
|
124 |
@FunctionalInterface
|
|
125 |
public interface RestartableTask {
|
|
126 |
|
|
127 |
/**
|
|
128 |
* The body of the task.
|
|
129 |
*
|
|
130 |
* @param taskCompleter
|
|
131 |
* A completer that must be invoked once, and only once,
|
|
132 |
* when this task is logically finished
|
|
133 |
*/
|
|
134 |
void run(DeferredCompleter taskCompleter);
|
|
135 |
}
|
|
136 |
|
|
137 |
/**
|
49765
|
138 |
* A simple and self-contained task that completes once its {@code run}
|
|
139 |
* method returns.
|
48083
|
140 |
*/
|
|
141 |
public static abstract class CompleteRestartableTask
|
|
142 |
implements RestartableTask
|
|
143 |
{
|
|
144 |
@Override
|
|
145 |
public final void run(DeferredCompleter taskCompleter) {
|
|
146 |
try {
|
|
147 |
run();
|
|
148 |
} finally {
|
|
149 |
taskCompleter.complete();
|
|
150 |
}
|
|
151 |
}
|
|
152 |
|
|
153 |
/** The body of the task. */
|
|
154 |
protected abstract void run();
|
|
155 |
}
|
|
156 |
|
|
157 |
/**
|
49765
|
158 |
* A task that runs its main loop within a synchronized block to provide
|
|
159 |
* memory visibility between runs. Since the main loop can't run concurrently,
|
|
160 |
* the lock shouldn't be contended and no deadlock should ever be possible.
|
48083
|
161 |
*/
|
|
162 |
public static final class SynchronizedRestartableTask
|
|
163 |
extends CompleteRestartableTask {
|
49765
|
164 |
|
48083
|
165 |
private final Runnable mainLoop;
|
|
166 |
private final Object lock = new Object();
|
49765
|
167 |
|
48083
|
168 |
public SynchronizedRestartableTask(Runnable mainLoop) {
|
|
169 |
this.mainLoop = mainLoop;
|
|
170 |
}
|
|
171 |
|
|
172 |
@Override
|
|
173 |
protected void run() {
|
|
174 |
synchronized(lock) {
|
|
175 |
mainLoop.run();
|
|
176 |
}
|
|
177 |
}
|
|
178 |
}
|
|
179 |
|
|
180 |
private static final int OFFLOAD = 1;
|
|
181 |
private static final int AGAIN = 2;
|
|
182 |
private static final int BEGIN = 4;
|
|
183 |
private static final int STOP = 8;
|
|
184 |
private static final int END = 16;
|
|
185 |
|
|
186 |
private final AtomicInteger state = new AtomicInteger(END);
|
|
187 |
private final RestartableTask restartableTask;
|
|
188 |
private final DeferredCompleter completer;
|
|
189 |
private final SchedulableTask schedulableTask;
|
|
190 |
|
|
191 |
/**
|
49765
|
192 |
* An auxiliary task that starts the restartable task:
|
48083
|
193 |
* {@code restartableTask.run(completer)}.
|
|
194 |
*/
|
|
195 |
private final class SchedulableTask implements Runnable {
|
|
196 |
@Override
|
|
197 |
public void run() {
|
|
198 |
restartableTask.run(completer);
|
|
199 |
}
|
|
200 |
}
|
|
201 |
|
|
202 |
public SequentialScheduler(RestartableTask restartableTask) {
|
|
203 |
this.restartableTask = requireNonNull(restartableTask);
|
|
204 |
this.completer = new TryEndDeferredCompleter();
|
|
205 |
this.schedulableTask = new SchedulableTask();
|
|
206 |
}
|
|
207 |
|
|
208 |
/**
|
|
209 |
* Runs or schedules the task to be run.
|
|
210 |
*
|
|
211 |
* @implSpec The recursion which is possible here must be bounded:
|
|
212 |
*
|
|
213 |
* <pre>{@code
|
|
214 |
* this.runOrSchedule()
|
|
215 |
* completer.complete()
|
|
216 |
* this.runOrSchedule()
|
|
217 |
* ...
|
|
218 |
* }</pre>
|
|
219 |
*
|
|
220 |
* @implNote The recursion in this implementation has the maximum
|
|
221 |
* depth of 1.
|
|
222 |
*/
|
|
223 |
public void runOrSchedule() {
|
|
224 |
runOrSchedule(schedulableTask, null);
|
|
225 |
}
|
|
226 |
|
|
227 |
/**
|
49765
|
228 |
* Executes or schedules the task to be executed in the provided executor.
|
48083
|
229 |
*
|
|
230 |
* <p> This method can be used when potential executing from a calling
|
|
231 |
* thread is not desirable.
|
|
232 |
*
|
|
233 |
* @param executor
|
|
234 |
* An executor in which to execute the task, if the task needs
|
|
235 |
* to be executed.
|
|
236 |
*
|
|
237 |
* @apiNote The given executor can be {@code null} in which case calling
|
49765
|
238 |
* {@code runOrSchedule(null)} is strictly equivalent to calling
|
48083
|
239 |
* {@code runOrSchedule()}.
|
|
240 |
*/
|
49765
|
241 |
public void runOrSchedule(Executor executor) {
|
48083
|
242 |
runOrSchedule(schedulableTask, executor);
|
|
243 |
}
|
|
244 |
|
|
245 |
private void runOrSchedule(SchedulableTask task, Executor executor) {
|
|
246 |
while (true) {
|
|
247 |
int s = state.get();
|
|
248 |
if (s == END) {
|
|
249 |
if (state.compareAndSet(END, BEGIN)) {
|
|
250 |
break;
|
|
251 |
}
|
|
252 |
} else if ((s & BEGIN) != 0) {
|
|
253 |
// Tries to change the state to AGAIN, preserving OFFLOAD bit
|
|
254 |
if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
|
|
255 |
return;
|
|
256 |
}
|
|
257 |
} else if ((s & AGAIN) != 0 || s == STOP) {
|
|
258 |
/* In the case of AGAIN the scheduler does not provide
|
|
259 |
happens-before relationship between actions prior to
|
|
260 |
runOrSchedule() and actions that happen in task.run().
|
|
261 |
The reason is that no volatile write is done in this case,
|
|
262 |
and the call piggybacks on the call that has actually set
|
|
263 |
AGAIN state. */
|
|
264 |
return;
|
|
265 |
} else {
|
|
266 |
// Non-existent state, or the one that cannot be offloaded
|
|
267 |
throw new InternalError(String.valueOf(s));
|
|
268 |
}
|
|
269 |
}
|
|
270 |
if (executor == null) {
|
|
271 |
task.run();
|
|
272 |
} else {
|
|
273 |
executor.execute(task);
|
|
274 |
}
|
|
275 |
}
|
|
276 |
|
|
277 |
/** The only concrete {@code DeferredCompleter} implementation. */
|
|
278 |
private class TryEndDeferredCompleter extends DeferredCompleter {
|
|
279 |
|
|
280 |
@Override
|
|
281 |
public void complete() {
|
|
282 |
while (true) {
|
|
283 |
int s;
|
|
284 |
while (((s = state.get()) & OFFLOAD) != 0) {
|
|
285 |
// Tries to offload ending of the task to the parent
|
|
286 |
if (state.compareAndSet(s, s & ~OFFLOAD)) {
|
|
287 |
return;
|
|
288 |
}
|
|
289 |
}
|
|
290 |
while (true) {
|
|
291 |
if ((s & OFFLOAD) != 0) {
|
|
292 |
/* OFFLOAD bit can never be observed here. Otherwise
|
|
293 |
it would mean there is another invocation of
|
|
294 |
"complete" that can run the task. */
|
|
295 |
throw new InternalError(String.valueOf(s));
|
|
296 |
}
|
|
297 |
if (s == BEGIN) {
|
|
298 |
if (state.compareAndSet(BEGIN, END)) {
|
|
299 |
return;
|
|
300 |
}
|
|
301 |
} else if (s == AGAIN) {
|
|
302 |
if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
|
|
303 |
break;
|
|
304 |
}
|
|
305 |
} else if (s == STOP) {
|
|
306 |
return;
|
|
307 |
} else if (s == END) {
|
|
308 |
throw new IllegalStateException("Duplicate completion");
|
|
309 |
} else {
|
|
310 |
// Non-existent state
|
|
311 |
throw new InternalError(String.valueOf(s));
|
|
312 |
}
|
|
313 |
s = state.get();
|
|
314 |
}
|
|
315 |
restartableTask.run(completer);
|
|
316 |
}
|
|
317 |
}
|
|
318 |
}
|
|
319 |
|
|
320 |
/**
|
|
321 |
* Tells whether, or not, this scheduler has been permanently stopped.
|
|
322 |
*
|
|
323 |
* <p> Should be used from inside the task to poll the status of the
|
|
324 |
* scheduler, pretty much the same way as it is done for threads:
|
|
325 |
* <pre>{@code
|
|
326 |
* if (!Thread.currentThread().isInterrupted()) {
|
|
327 |
* ...
|
|
328 |
* }
|
|
329 |
* }</pre>
|
|
330 |
*/
|
|
331 |
public boolean isStopped() {
|
|
332 |
return state.get() == STOP;
|
|
333 |
}
|
|
334 |
|
|
335 |
/**
|
|
336 |
* Stops this scheduler. Subsequent invocations of {@code runOrSchedule}
|
|
337 |
* are effectively no-ops.
|
|
338 |
*
|
|
339 |
* <p> If the task has already begun, this invocation will not affect it,
|
|
340 |
* unless the task itself uses {@code isStopped()} method to check the state
|
|
341 |
* of the handler.
|
|
342 |
*/
|
|
343 |
public void stop() {
|
|
344 |
state.set(STOP);
|
|
345 |
}
|
|
346 |
|
|
347 |
/**
|
|
348 |
* Returns a new {@code SequentialScheduler} that executes the provided
|
|
349 |
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
|
|
350 |
*
|
49765
|
351 |
* @apiNote This is equivalent to calling
|
|
352 |
* {@code new SequentialScheduler(new SynchronizedRestartableTask(mainLoop))}
|
|
353 |
* The main loop must not perform any blocking operation.
|
48083
|
354 |
*
|
49765
|
355 |
* @param mainLoop The main loop of the new sequential scheduler
|
48083
|
356 |
* @return a new {@code SequentialScheduler} that executes the provided
|
|
357 |
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
|
|
358 |
*/
|
49765
|
359 |
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
|
|
360 |
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
|
48083
|
361 |
}
|
|
362 |
}
|