|
1 /* |
|
2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package jdk.incubator.http.internal.common; |
|
27 |
|
28 import java.util.Map; |
|
29 import java.util.Objects; |
|
30 import java.util.Optional; |
|
31 import java.util.WeakHashMap; |
|
32 import java.util.concurrent.Flow.Publisher; |
|
33 import java.util.concurrent.Flow.Subscriber; |
|
34 import java.util.concurrent.Flow.Subscription; |
|
35 import java.util.concurrent.atomic.AtomicInteger; |
|
36 import java.util.concurrent.atomic.AtomicReference; |
|
37 import java.util.concurrent.locks.ReentrantLock; |
|
38 |
|
39 /** |
|
40 * This publisher signals {@code onNext} synchronously and |
|
41 * {@code onComplete}/{@code onError} asynchronously to its only subscriber. |
|
42 * |
|
43 * <p> This publisher supports a single subscriber over this publisher's |
|
44 * lifetime. {@code signalComplete} and {@code signalError} may be called before |
|
45 * the subscriber has subscribed. |
|
46 * |
|
47 * <p> The subscriber's requests are signalled to the subscription supplied to |
|
48 * the {@code feedback} method. |
|
49 * |
|
50 * <p> {@code subscribe} and {@code feedback} methods can be called in any |
|
51 * order. |
|
52 * |
|
53 * <p> {@code signalNext} may be called recursively, the implementation will |
|
54 * bound the depth of the recursion. |
|
55 * |
|
56 * <p> It is always an error to call {@code signalNext} without a sufficient |
|
57 * demand. |
|
58 * |
|
59 * <p> If subscriber throws an exception from any of its methods, the |
|
60 * subscription will be cancelled. |
|
61 */ |
|
62 public final class SynchronousPublisher<T> implements Publisher<T> { |
|
63 /* |
|
64 * PENDING, ACTIVE and CANCELLED are states. TERMINATE and DELIVERING are |
|
65 * state modifiers, they cannot appear in the state on their own. |
|
66 * |
|
67 * PENDING, ACTIVE and CANCELLED are mutually exclusive states. Any two of |
|
68 * those bits cannot be set at the same time in state. |
|
69 * |
|
70 * PENDING -----------------> ACTIVE <------> DELIVERING |
|
71 * | | |
|
72 * +------> TERMINATE <------+ |
|
73 * | | | |
|
74 * | v | |
|
75 * +------> CANCELLED <------+ |
|
76 * |
|
77 * The following states are allowed: |
|
78 * |
|
79 * PENDING |
|
80 * PENDING | TERMINATE, |
|
81 * ACTIVE, |
|
82 * ACTIVE | DELIVERING, |
|
83 * ACTIVE | TERMINATE, |
|
84 * ACTIVE | DELIVERING | TERMINATE |
|
85 * CANCELLED |
|
86 */ |
|
87 /** |
|
88 * A state modifier meaning {@code onSubscribe} has not been called yet. |
|
89 * |
|
90 * <p> After {@code onSubscribe} has been called the machine can transition |
|
91 * into {@code ACTIVE}, {@code PENDING | TERMINATE} or {@code CANCELLED}. |
|
92 */ |
|
93 private static final int PENDING = 1; |
|
94 /** |
|
95 * A state modifier meaning {@code onSubscribe} has been called, no error |
|
96 * and no completion has been signalled and {@code onNext} may be called. |
|
97 */ |
|
98 private static final int ACTIVE = 2; |
|
99 /** |
|
100 * A state modifier meaning no calls to subscriber may be made. |
|
101 * |
|
102 * <p> Once this modifier is set, it will not be unset. It's a final state. |
|
103 */ |
|
104 private static final int CANCELLED = 4; |
|
105 /** |
|
106 * A state modifier meaning {@code onNext} is being called and no other |
|
107 * signal may be made. |
|
108 * |
|
109 * <p> This bit can be set at any time. signalNext uses it to ensure the |
|
110 * method is called sequentially. |
|
111 */ |
|
112 private static final int DELIVERING = 8; |
|
113 /** |
|
114 * A state modifier meaning the next call must be either {@code onComplete} |
|
115 * or {@code onError}. |
|
116 * |
|
117 * <p> The concrete method depends on the value of {@code terminationType}). |
|
118 * {@code TERMINATE} bit cannot appear on its own, it can be set only with |
|
119 * {@code PENDING} or {@code ACTIVE}. |
|
120 */ |
|
121 private static final int TERMINATE = 16; |
|
122 /** |
|
123 * Current demand. If fulfilled, no {@code onNext} signals may be made. |
|
124 */ |
|
125 private final Demand demand = new Demand(); |
|
126 /** |
|
127 * The current state of the subscription. Contains disjunctions of the above |
|
128 * state modifiers. |
|
129 */ |
|
130 private final AtomicInteger state = new AtomicInteger(PENDING); |
|
131 /** |
|
132 * A convenient way to represent 3 values: not set, completion and error. |
|
133 */ |
|
134 private final AtomicReference<Optional<Throwable>> terminationType |
|
135 = new AtomicReference<>(); |
|
136 /** |
|
137 * {@code signalNext} uses this lock to ensure the method is called in a |
|
138 * thread-safe manner. |
|
139 */ |
|
140 private final ReentrantLock nextLock = new ReentrantLock(); |
|
141 private T next; |
|
142 |
|
143 private final Object lock = new Object(); |
|
144 /** |
|
145 * This map stores the subscribers attempted to subscribe to this publisher. |
|
146 * It is needed so this publisher does not call {@code onSubscribe} on a |
|
147 * subscriber more than once (Rule 2.12). |
|
148 * |
|
149 * <p> It will most likely have a single entry for the only subscriber. |
|
150 * Because this publisher is one-off, subscribing to it more than once is an |
|
151 * error. |
|
152 */ |
|
153 private final Map<Subscriber<?>, Object> knownSubscribers |
|
154 = new WeakHashMap<>(1, 1); |
|
155 /** |
|
156 * The active subscriber. This reference will be reset to {@code null} once |
|
157 * the subscription becomes cancelled (Rule 3.13). |
|
158 */ |
|
159 private volatile Subscriber<? super T> subscriber; |
|
160 /** |
|
161 * A temporary subscription that receives all calls to |
|
162 * {@code request}/{@code cancel} until two things happen: (1) the feedback |
|
163 * becomes set and (2) {@code onSubscribe} method is called on the |
|
164 * subscriber. |
|
165 * |
|
166 * <p> The first condition is obvious. The second one is about not |
|
167 * propagating requests to {@code feedback} until {@code onSubscribe} call |
|
168 * has been finished. The reason is that Rule 1.3 requires the subscriber |
|
169 * methods to be called in a thread-safe manner. This, in particular, |
|
170 * implies that if called from multiple threads, the calls must not be |
|
171 * concurrent. If, for instance, {@code subscription.request(long)) (and |
|
172 * this is a usual state of affairs) is called from within |
|
173 * {@code onSubscribe} call, the publisher will have to resort to some sort |
|
174 * of queueing (locks, queues, etc.) of possibly arriving {@code onNext} |
|
175 * signals while in {@code onSubscribe}. This publisher doesn't queue |
|
176 * signals, instead it "queues" requests. Because requests are just numbers |
|
177 * and requests are additive, the effective queue is a single number of |
|
178 * total requests made so far. |
|
179 */ |
|
180 private final TemporarySubscription temporarySubscription |
|
181 = new TemporarySubscription(); |
|
182 private volatile Subscription feedback; |
|
183 /** |
|
184 * Keeping track of whether a subscription may be made. (The {@code |
|
185 * subscriber} field may later become {@code null}, but this flag is |
|
186 * permanent. Once {@code true} forever {@code true}. |
|
187 */ |
|
188 private boolean subscribed; |
|
189 |
|
190 @Override |
|
191 public void subscribe(Subscriber<? super T> sub) { |
|
192 Objects.requireNonNull(sub); |
|
193 boolean success = false; |
|
194 boolean duplicate = false; |
|
195 synchronized (lock) { |
|
196 if (!subscribed) { |
|
197 subscribed = true; |
|
198 subscriber = sub; |
|
199 assert !knownSubscribers.containsKey(subscriber); |
|
200 knownSubscribers.put(subscriber, null); |
|
201 success = true; |
|
202 } else if (sub.equals(subscriber)) { |
|
203 duplicate = true; |
|
204 } else if (!knownSubscribers.containsKey(sub)) { |
|
205 knownSubscribers.put(sub, null); |
|
206 } else { |
|
207 return; |
|
208 } |
|
209 } |
|
210 if (success) { |
|
211 signalSubscribe(); |
|
212 } else if (duplicate) { |
|
213 signalError(new IllegalStateException("Duplicate subscribe")); |
|
214 } else { |
|
215 // This is a best-effort attempt for an isolated publisher to call |
|
216 // a foreign subscriber's methods in a sequential order. However it |
|
217 // cannot be guaranteed unless all publishers share information on |
|
218 // all subscribers in the system. This publisher does its job right. |
|
219 sub.onSubscribe(new NopSubscription()); |
|
220 sub.onError(new IllegalStateException("Already subscribed")); |
|
221 } |
|
222 } |
|
223 |
|
224 /** |
|
225 * Accepts a subscription that is signalled with the subscriber's requests. |
|
226 * |
|
227 * @throws NullPointerException |
|
228 * if {@code subscription} is {@code null} |
|
229 * @throws IllegalStateException |
|
230 * if there is a feedback subscription already |
|
231 */ |
|
232 public void feedback(Subscription subscription) { |
|
233 Objects.requireNonNull(subscription); |
|
234 synchronized (lock) { |
|
235 if (feedback != null) { |
|
236 throw new IllegalStateException( |
|
237 "Already has a feedback subscription"); |
|
238 } |
|
239 feedback = subscription; |
|
240 if ((state.get() & PENDING) == 0) { |
|
241 temporarySubscription.replaceWith(new PermanentSubscription()); |
|
242 } |
|
243 } |
|
244 } |
|
245 |
|
246 /** |
|
247 * Tries to deliver the specified item to the subscriber. |
|
248 * |
|
249 * <p> The item may not be delivered even if there is a demand. This can |
|
250 * happen as a result of subscriber cancelling the subscription by |
|
251 * signalling {@code cancel} or this publisher cancelling the subscription |
|
252 * by signaling {@code onError} or {@code onComplete}. |
|
253 * |
|
254 * <p> Given no exception is thrown, a call to this method decremented the |
|
255 * demand. |
|
256 * |
|
257 * @param item |
|
258 * the item to deliver to the subscriber |
|
259 * |
|
260 * @return {@code true} iff the subscriber has received {@code item} |
|
261 * @throws NullPointerException |
|
262 * if {@code item} is {@code null} |
|
263 * @throws IllegalStateException |
|
264 * if there is no demand |
|
265 * @throws IllegalStateException |
|
266 * the method is called concurrently |
|
267 */ |
|
268 public boolean signalNext(T item) { |
|
269 Objects.requireNonNull(item); |
|
270 if (!nextLock.tryLock()) { |
|
271 throw new IllegalStateException("Concurrent signalling"); |
|
272 } |
|
273 boolean recursion = false; |
|
274 try { |
|
275 next = item; |
|
276 while (true) { |
|
277 int s = state.get(); |
|
278 if ((s & DELIVERING) == DELIVERING) { |
|
279 recursion = true; |
|
280 break; |
|
281 } else if (state.compareAndSet(s, s | DELIVERING)) { |
|
282 break; |
|
283 } |
|
284 } |
|
285 if (!demand.tryDecrement()) { |
|
286 // Hopefully this will help to find bugs in this publisher's |
|
287 // clients. Because signalNext should never be issues without |
|
288 // having a sufficient demand. Even if the thing is cancelled! |
|
289 // next = null; |
|
290 throw new IllegalStateException("No demand"); |
|
291 } |
|
292 if (recursion) { |
|
293 return true; |
|
294 } |
|
295 while (next != null) { |
|
296 int s = state.get(); |
|
297 if ((s & (ACTIVE | TERMINATE)) == (ACTIVE | TERMINATE)) { |
|
298 if (state.compareAndSet( |
|
299 s, CANCELLED | (s & ~(ACTIVE | TERMINATE)))) { |
|
300 // terminationType must be read only after the |
|
301 // termination condition has been observed |
|
302 // (those have been stored in the opposite order) |
|
303 Optional<Throwable> t = terminationType.get(); |
|
304 dispatchTerminationAndUnsubscribe(t); |
|
305 return false; |
|
306 } |
|
307 } else if ((s & ACTIVE) == ACTIVE) { |
|
308 try { |
|
309 T t = next; |
|
310 next = null; |
|
311 subscriber.onNext(t); |
|
312 } catch (Throwable t) { |
|
313 cancelNow(); |
|
314 throw t; |
|
315 } |
|
316 } else if ((s & CANCELLED) == CANCELLED) { |
|
317 return false; |
|
318 } else if ((s & PENDING) == PENDING) { |
|
319 // Actually someone called signalNext even before |
|
320 // onSubscribe has been called, but from this publisher's |
|
321 // API point of view it's still "No demand" |
|
322 throw new IllegalStateException("No demand"); |
|
323 } else { |
|
324 throw new InternalError(String.valueOf(s)); |
|
325 } |
|
326 } |
|
327 return true; |
|
328 } finally { |
|
329 while (!recursion) { // If the call was not recursive unset the bit |
|
330 int s = state.get(); |
|
331 if ((s & DELIVERING) != DELIVERING) { |
|
332 throw new InternalError(String.valueOf(s)); |
|
333 } else if (state.compareAndSet(s, s & ~DELIVERING)) { |
|
334 break; |
|
335 } |
|
336 } |
|
337 nextLock.unlock(); |
|
338 } |
|
339 } |
|
340 |
|
341 /** |
|
342 * Cancels the subscription by signalling {@code onError} to the subscriber. |
|
343 * |
|
344 * <p> Will not signal {@code onError} if the subscription has been |
|
345 * cancelled already. |
|
346 * |
|
347 * <p> This method may be called at any time. |
|
348 * |
|
349 * @param error |
|
350 * the error to signal |
|
351 * |
|
352 * @throws NullPointerException |
|
353 * if {@code error} is {@code null} |
|
354 */ |
|
355 public void signalError(Throwable error) { |
|
356 terminateNow(Optional.of(error)); |
|
357 } |
|
358 |
|
359 /** |
|
360 * Cancels the subscription by signalling {@code onComplete} to the |
|
361 * subscriber. |
|
362 * |
|
363 * <p> Will not signal {@code onComplete} if the subscription has been |
|
364 * cancelled already. |
|
365 * |
|
366 * <p> This method may be called at any time. |
|
367 */ |
|
368 public void signalComplete() { |
|
369 terminateNow(Optional.empty()); |
|
370 } |
|
371 |
|
372 /** |
|
373 * Must be called first and at most once. |
|
374 */ |
|
375 private void signalSubscribe() { |
|
376 assert subscribed; |
|
377 try { |
|
378 subscriber.onSubscribe(temporarySubscription); |
|
379 } catch (Throwable t) { |
|
380 cancelNow(); |
|
381 throw t; |
|
382 } |
|
383 while (true) { |
|
384 int s = state.get(); |
|
385 if ((s & (PENDING | TERMINATE)) == (PENDING | TERMINATE)) { |
|
386 if (state.compareAndSet( |
|
387 s, CANCELLED | (s & ~(PENDING | TERMINATE)))) { |
|
388 Optional<Throwable> t = terminationType.get(); |
|
389 dispatchTerminationAndUnsubscribe(t); |
|
390 return; |
|
391 } |
|
392 } else if ((s & PENDING) == PENDING) { |
|
393 if (state.compareAndSet(s, ACTIVE | (s & ~PENDING))) { |
|
394 synchronized (lock) { |
|
395 if (feedback != null) { |
|
396 temporarySubscription |
|
397 .replaceWith(new PermanentSubscription()); |
|
398 } |
|
399 } |
|
400 return; |
|
401 } |
|
402 } else { // It should not be in any other state |
|
403 throw new InternalError(String.valueOf(s)); |
|
404 } |
|
405 } |
|
406 } |
|
407 |
|
408 private void unsubscribe() { |
|
409 subscriber = null; |
|
410 } |
|
411 |
|
412 private final static class NopSubscription implements Subscription { |
|
413 |
|
414 @Override |
|
415 public void request(long n) { } |
|
416 @Override |
|
417 public void cancel() { } |
|
418 } |
|
419 |
|
420 private final class PermanentSubscription implements Subscription { |
|
421 |
|
422 @Override |
|
423 public void request(long n) { |
|
424 if (n <= 0) { |
|
425 signalError(new IllegalArgumentException( |
|
426 "non-positive subscription request")); |
|
427 } else { |
|
428 demand.increase(n); |
|
429 feedback.request(n); |
|
430 } |
|
431 } |
|
432 |
|
433 @Override |
|
434 public void cancel() { |
|
435 if (cancelNow()) { |
|
436 unsubscribe(); |
|
437 // feedback.cancel() is called at most once |
|
438 // (let's not assume idempotency) |
|
439 feedback.cancel(); |
|
440 } |
|
441 } |
|
442 } |
|
443 |
|
444 /** |
|
445 * Cancels the subscription unless it has been cancelled already. |
|
446 * |
|
447 * @return {@code true} iff the subscription has been cancelled as a result |
|
448 * of this call |
|
449 */ |
|
450 private boolean cancelNow() { |
|
451 while (true) { |
|
452 int s = state.get(); |
|
453 if ((s & CANCELLED) == CANCELLED) { |
|
454 return false; |
|
455 } else if ((s & (ACTIVE | PENDING)) != 0) { |
|
456 // ACTIVE or PENDING |
|
457 if (state.compareAndSet( |
|
458 s, CANCELLED | (s & ~(ACTIVE | PENDING)))) { |
|
459 unsubscribe(); |
|
460 return true; |
|
461 } |
|
462 } else { |
|
463 throw new InternalError(String.valueOf(s)); |
|
464 } |
|
465 } |
|
466 } |
|
467 |
|
468 /** |
|
469 * Terminates this subscription unless is has been cancelled already. |
|
470 * |
|
471 * @param t the type of termination |
|
472 */ |
|
473 private void terminateNow(Optional<Throwable> t) { |
|
474 // Termination condition must be set only after the termination |
|
475 // type has been set (those will be read in the opposite order) |
|
476 if (!terminationType.compareAndSet(null, t)) { |
|
477 return; |
|
478 } |
|
479 while (true) { |
|
480 int s = state.get(); |
|
481 if ((s & CANCELLED) == CANCELLED) { |
|
482 return; |
|
483 } else if ((s & (PENDING | DELIVERING)) != 0) { |
|
484 // PENDING or DELIVERING (which implies ACTIVE) |
|
485 if (state.compareAndSet(s, s | TERMINATE)) { |
|
486 return; |
|
487 } |
|
488 } else if ((s & ACTIVE) == ACTIVE) { |
|
489 if (state.compareAndSet(s, CANCELLED | (s & ~ACTIVE))) { |
|
490 dispatchTerminationAndUnsubscribe(t); |
|
491 return; |
|
492 } |
|
493 } else { |
|
494 throw new InternalError(String.valueOf(s)); |
|
495 } |
|
496 } |
|
497 } |
|
498 |
|
499 private void dispatchTerminationAndUnsubscribe(Optional<Throwable> t) { |
|
500 try { |
|
501 t.ifPresentOrElse(subscriber::onError, subscriber::onComplete); |
|
502 } finally { |
|
503 unsubscribe(); |
|
504 } |
|
505 } |
|
506 } |