1 /* |
|
2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package java.net.http.internal.common; |
|
27 |
|
28 import java.io.Closeable; |
|
29 import java.lang.System.Logger.Level; |
|
30 import java.nio.ByteBuffer; |
|
31 import java.util.List; |
|
32 import java.util.Objects; |
|
33 import java.util.concurrent.CompletableFuture; |
|
34 import java.util.concurrent.ConcurrentLinkedQueue; |
|
35 import java.util.concurrent.Flow; |
|
36 import java.util.concurrent.Flow.Subscriber; |
|
37 import java.util.concurrent.atomic.AtomicLong; |
|
38 import java.util.concurrent.atomic.AtomicReference; |
|
39 |
|
40 /** |
|
41 * A wrapper for a Flow.Subscriber. This wrapper delivers data to the wrapped |
|
42 * Subscriber which is supplied to the constructor. This class takes care of |
|
43 * downstream flow control automatically and upstream flow control automatically |
|
44 * by default. |
|
45 * <p> |
|
46 * Processing is done by implementing the {@link #incoming(List, boolean)} method |
|
47 * which supplies buffers from upstream. This method (or any other method) |
|
48 * can then call the outgoing() method to deliver processed buffers downstream. |
|
49 * <p> |
|
50 * Upstream error signals are delivered downstream directly. Cancellation from |
|
51 * downstream is also propagated upstream immediately. |
|
52 * <p> |
|
53 * Each SubscriberWrapper has a {@link java.util.concurrent.CompletableFuture}{@code <Void>} |
|
54 * which propagates completion/errors from downstream to upstream. Normal completion |
|
55 * can only occur after onComplete() is called, but errors can be propagated upwards |
|
56 * at any time. |
|
57 */ |
|
58 public abstract class SubscriberWrapper |
|
59 implements FlowTube.TubeSubscriber, Closeable, Flow.Processor<List<ByteBuffer>,List<ByteBuffer>> |
|
60 // TODO: SSLTube Subscriber will never change? Does this really need to be a TS? |
|
61 { |
|
62 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
63 final System.Logger logger = |
|
64 Utils.getDebugLogger(this::dbgString, DEBUG); |
|
65 |
|
66 public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE } |
|
67 |
|
68 volatile Flow.Subscription upstreamSubscription; |
|
69 final SubscriptionBase downstreamSubscription; |
|
70 volatile boolean upstreamCompleted; |
|
71 volatile boolean downstreamCompleted; |
|
72 volatile boolean completionAcknowledged; |
|
73 private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber; |
|
74 // processed byte to send to the downstream subscriber. |
|
75 private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ; |
|
76 private final CompletableFuture<Void> cf; |
|
77 private final SequentialScheduler pushScheduler; |
|
78 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
79 final AtomicLong upstreamWindow = new AtomicLong(0); |
|
80 |
|
81 /** |
|
82 * Wraps the given downstream subscriber. For each call to {@link |
|
83 * #onNext(List<ByteBuffer>) } the given filter function is invoked |
|
84 * and the list (if not empty) returned is passed downstream. |
|
85 * |
|
86 * A {@code CompletableFuture} is supplied which can be used to signal an |
|
87 * error from downstream and which terminates the wrapper or which signals |
|
88 * completion of downstream activity which can be propagated upstream. Error |
|
89 * completion can be signaled at any time, but normal completion must not be |
|
90 * signaled before onComplete() is called. |
|
91 */ |
|
92 public SubscriberWrapper() |
|
93 { |
|
94 this.outputQ = new ConcurrentLinkedQueue<>(); |
|
95 this.cf = new MinimalFuture<>(); |
|
96 this.pushScheduler = |
|
97 SequentialScheduler.synchronizedScheduler(new DownstreamPusher()); |
|
98 this.downstreamSubscription = new SubscriptionBase(pushScheduler, |
|
99 this::downstreamCompletion); |
|
100 } |
|
101 |
|
102 @Override |
|
103 public final void subscribe(Subscriber<? super List<ByteBuffer>> downstreamSubscriber) { |
|
104 Objects.requireNonNull(downstreamSubscriber); |
|
105 this.downstreamSubscriber = downstreamSubscriber; |
|
106 } |
|
107 |
|
108 /** |
|
109 * Wraps the given downstream wrapper in this. For each call to |
|
110 * {@link #onNext(List<ByteBuffer>) } the incoming() method is called. |
|
111 * |
|
112 * The {@code downstreamCF} from the downstream wrapper is linked to this |
|
113 * wrappers notifier. |
|
114 * |
|
115 * @param downstreamWrapper downstream destination |
|
116 */ |
|
117 public SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper) |
|
118 { |
|
119 this(); |
|
120 subscribe(downstreamWrapper); |
|
121 } |
|
122 |
|
123 /** |
|
124 * Delivers data to be processed by this wrapper. Generated data to be sent |
|
125 * downstream, must be provided to the {@link #outgoing(List, boolean)}} |
|
126 * method. |
|
127 * |
|
128 * @param buffers a List of ByteBuffers. |
|
129 * @param complete if true then no more data will be added to the list |
|
130 */ |
|
131 protected abstract void incoming(List<ByteBuffer> buffers, boolean complete); |
|
132 |
|
133 /** |
|
134 * This method is called to determine the window size to use at any time. The |
|
135 * current window is supplied together with the current downstream queue size. |
|
136 * {@code 0} should be returned if no change is |
|
137 * required or a positive integer which will be added to the current window. |
|
138 * The default implementation maintains a downstream queue size of no greater |
|
139 * than 5. The method can be overridden if required. |
|
140 * |
|
141 * @param currentWindow the current upstream subscription window |
|
142 * @param downstreamQsize the current number of buffers waiting to be sent |
|
143 * downstream |
|
144 * |
|
145 * @return value to add to currentWindow |
|
146 */ |
|
147 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { |
|
148 if (downstreamQsize > 5) { |
|
149 return 0; |
|
150 } |
|
151 |
|
152 if (currentWindow == 0) { |
|
153 return 1; |
|
154 } else { |
|
155 return 0; |
|
156 } |
|
157 } |
|
158 |
|
159 /** |
|
160 * Override this if anything needs to be done after the upstream subscriber |
|
161 * has subscribed |
|
162 */ |
|
163 protected void onSubscribe() { |
|
164 } |
|
165 |
|
166 /** |
|
167 * Override this if anything needs to be done before checking for error |
|
168 * and processing the input queue. |
|
169 * @return |
|
170 */ |
|
171 protected SchedulingAction enterScheduling() { |
|
172 return SchedulingAction.CONTINUE; |
|
173 } |
|
174 |
|
175 protected boolean signalScheduling() { |
|
176 if (downstreamCompleted || pushScheduler.isStopped()) { |
|
177 return false; |
|
178 } |
|
179 pushScheduler.runOrSchedule(); |
|
180 return true; |
|
181 } |
|
182 |
|
183 /** |
|
184 * Delivers buffers of data downstream. After incoming() |
|
185 * has been called complete == true signifying completion of the upstream |
|
186 * subscription, data may continue to be delivered, up to when outgoing() is |
|
187 * called complete == true, after which, the downstream subscription is |
|
188 * completed. |
|
189 * |
|
190 * It's an error to call outgoing() with complete = true if incoming() has |
|
191 * not previously been called with it. |
|
192 */ |
|
193 public void outgoing(ByteBuffer buffer, boolean complete) { |
|
194 Objects.requireNonNull(buffer); |
|
195 assert !complete || !buffer.hasRemaining(); |
|
196 outgoing(List.of(buffer), complete); |
|
197 } |
|
198 |
|
199 /** |
|
200 * Sometime it might be necessary to complete the downstream subscriber |
|
201 * before the upstream completes. For instance, when an SSL server |
|
202 * sends a notify_close. In that case we should let the outgoing |
|
203 * complete before upstream us completed. |
|
204 * @return true, may be overridden by subclasses. |
|
205 */ |
|
206 public boolean closing() { |
|
207 return false; |
|
208 } |
|
209 |
|
210 public void outgoing(List<ByteBuffer> buffers, boolean complete) { |
|
211 Objects.requireNonNull(buffers); |
|
212 if (complete) { |
|
213 assert Utils.remaining(buffers) == 0; |
|
214 boolean closing = closing(); |
|
215 logger.log(Level.DEBUG, |
|
216 "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s", |
|
217 upstreamCompleted, downstreamCompleted, closing); |
|
218 if (!upstreamCompleted && !closing) |
|
219 throw new IllegalStateException("upstream not completed"); |
|
220 completionAcknowledged = true; |
|
221 } else { |
|
222 logger.log(Level.DEBUG, () -> "Adding " |
|
223 + Utils.remaining(buffers) |
|
224 + " to outputQ queue"); |
|
225 outputQ.add(buffers); |
|
226 } |
|
227 logger.log(Level.DEBUG, () -> "pushScheduler " |
|
228 + (pushScheduler.isStopped() ? " is stopped!" : " is alive")); |
|
229 pushScheduler.runOrSchedule(); |
|
230 } |
|
231 |
|
232 /** |
|
233 * Returns a CompletableFuture which completes when this wrapper completes. |
|
234 * Normal completion happens with the following steps (in order): |
|
235 * 1. onComplete() is called |
|
236 * 2. incoming() called with complete = true |
|
237 * 3. outgoing() may continue to be called normally |
|
238 * 4. outgoing called with complete = true |
|
239 * 5. downstream subscriber is called onComplete() |
|
240 * |
|
241 * If the subscription is canceled or onComplete() is invoked the |
|
242 * CompletableFuture completes exceptionally. Exceptional completion |
|
243 * also occurs if downstreamCF completes exceptionally. |
|
244 */ |
|
245 public CompletableFuture<Void> completion() { |
|
246 return cf; |
|
247 } |
|
248 |
|
249 /** |
|
250 * Invoked whenever it 'may' be possible to push buffers downstream. |
|
251 */ |
|
252 class DownstreamPusher implements Runnable { |
|
253 @Override |
|
254 public void run() { |
|
255 try { |
|
256 run1(); |
|
257 } catch (Throwable t) { |
|
258 errorCommon(t); |
|
259 } |
|
260 } |
|
261 |
|
262 private void run1() { |
|
263 if (downstreamCompleted) { |
|
264 logger.log(Level.DEBUG, "DownstreamPusher: downstream is already completed"); |
|
265 return; |
|
266 } |
|
267 switch (enterScheduling()) { |
|
268 case CONTINUE: break; |
|
269 case RESCHEDULE: pushScheduler.runOrSchedule(); return; |
|
270 case RETURN: return; |
|
271 default: |
|
272 errorRef.compareAndSet(null, |
|
273 new InternalError("unknown scheduling command")); |
|
274 break; |
|
275 } |
|
276 // If there was an error, send it downstream. |
|
277 Throwable error = errorRef.get(); |
|
278 if (error != null) { |
|
279 synchronized(this) { |
|
280 if (downstreamCompleted) return; |
|
281 downstreamCompleted = true; |
|
282 } |
|
283 logger.log(Level.DEBUG, |
|
284 () -> "DownstreamPusher: forwarding error downstream: " + error); |
|
285 pushScheduler.stop(); |
|
286 outputQ.clear(); |
|
287 downstreamSubscriber.onError(error); |
|
288 return; |
|
289 } |
|
290 |
|
291 // OK - no error, let's proceed |
|
292 if (!outputQ.isEmpty()) { |
|
293 logger.log(Level.DEBUG, |
|
294 "DownstreamPusher: queue not empty, downstreamSubscription: %s", |
|
295 downstreamSubscription); |
|
296 } else { |
|
297 logger.log(Level.DEBUG, |
|
298 "DownstreamPusher: queue empty, downstreamSubscription: %s", |
|
299 downstreamSubscription); |
|
300 } |
|
301 |
|
302 final boolean dbgOn = logger.isLoggable(Level.DEBUG); |
|
303 while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) { |
|
304 List<ByteBuffer> b = outputQ.poll(); |
|
305 if (dbgOn) logger.log(Level.DEBUG, |
|
306 "DownstreamPusher: Pushing " |
|
307 + Utils.remaining(b) |
|
308 + " bytes downstream"); |
|
309 downstreamSubscriber.onNext(b); |
|
310 } |
|
311 upstreamWindowUpdate(); |
|
312 checkCompletion(); |
|
313 } |
|
314 } |
|
315 |
|
316 void upstreamWindowUpdate() { |
|
317 long downstreamQueueSize = outputQ.size(); |
|
318 long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize); |
|
319 if (n > 0) |
|
320 upstreamRequest(n); |
|
321 } |
|
322 |
|
323 @Override |
|
324 public void onSubscribe(Flow.Subscription subscription) { |
|
325 if (upstreamSubscription != null) { |
|
326 throw new IllegalStateException("Single shot publisher"); |
|
327 } |
|
328 this.upstreamSubscription = subscription; |
|
329 upstreamRequest(upstreamWindowUpdate(0, 0)); |
|
330 logger.log(Level.DEBUG, |
|
331 "calling downstreamSubscriber::onSubscribe on %s", |
|
332 downstreamSubscriber); |
|
333 downstreamSubscriber.onSubscribe(downstreamSubscription); |
|
334 onSubscribe(); |
|
335 } |
|
336 |
|
337 @Override |
|
338 public void onNext(List<ByteBuffer> item) { |
|
339 logger.log(Level.DEBUG, "onNext"); |
|
340 long prev = upstreamWindow.getAndDecrement(); |
|
341 if (prev <= 0) |
|
342 throw new IllegalStateException("invalid onNext call"); |
|
343 incomingCaller(item, false); |
|
344 upstreamWindowUpdate(); |
|
345 } |
|
346 |
|
347 private void upstreamRequest(long n) { |
|
348 logger.log(Level.DEBUG, "requesting %d", n); |
|
349 upstreamWindow.getAndAdd(n); |
|
350 upstreamSubscription.request(n); |
|
351 } |
|
352 |
|
353 protected void requestMore() { |
|
354 if (upstreamWindow.get() == 0) { |
|
355 upstreamRequest(1); |
|
356 } |
|
357 } |
|
358 |
|
359 public long upstreamWindow() { |
|
360 return upstreamWindow.get(); |
|
361 } |
|
362 |
|
363 @Override |
|
364 public void onError(Throwable throwable) { |
|
365 logger.log(Level.DEBUG, () -> "onError: " + throwable); |
|
366 errorCommon(Objects.requireNonNull(throwable)); |
|
367 } |
|
368 |
|
369 protected boolean errorCommon(Throwable throwable) { |
|
370 assert throwable != null || |
|
371 (throwable = new AssertionError("null throwable")) != null; |
|
372 if (errorRef.compareAndSet(null, throwable)) { |
|
373 logger.log(Level.DEBUG, "error", throwable); |
|
374 pushScheduler.runOrSchedule(); |
|
375 upstreamCompleted = true; |
|
376 cf.completeExceptionally(throwable); |
|
377 return true; |
|
378 } |
|
379 return false; |
|
380 } |
|
381 |
|
382 @Override |
|
383 public void close() { |
|
384 errorCommon(new RuntimeException("wrapper closed")); |
|
385 } |
|
386 |
|
387 private void incomingCaller(List<ByteBuffer> l, boolean complete) { |
|
388 try { |
|
389 incoming(l, complete); |
|
390 } catch(Throwable t) { |
|
391 errorCommon(t); |
|
392 } |
|
393 } |
|
394 |
|
395 @Override |
|
396 public void onComplete() { |
|
397 logger.log(Level.DEBUG, () -> "upstream completed: " + toString()); |
|
398 upstreamCompleted = true; |
|
399 incomingCaller(Utils.EMPTY_BB_LIST, true); |
|
400 // pushScheduler will call checkCompletion() |
|
401 pushScheduler.runOrSchedule(); |
|
402 } |
|
403 |
|
404 /** Adds the given data to the input queue. */ |
|
405 public void addData(ByteBuffer l) { |
|
406 if (upstreamSubscription == null) { |
|
407 throw new IllegalStateException("can't add data before upstream subscriber subscribes"); |
|
408 } |
|
409 incomingCaller(List.of(l), false); |
|
410 } |
|
411 |
|
412 void checkCompletion() { |
|
413 if (downstreamCompleted || !upstreamCompleted) { |
|
414 return; |
|
415 } |
|
416 if (!outputQ.isEmpty()) { |
|
417 return; |
|
418 } |
|
419 if (errorRef.get() != null) { |
|
420 pushScheduler.runOrSchedule(); |
|
421 return; |
|
422 } |
|
423 if (completionAcknowledged) { |
|
424 logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()"); |
|
425 downstreamSubscriber.onComplete(); |
|
426 // Fix me subscriber.onComplete.run(); |
|
427 downstreamCompleted = true; |
|
428 cf.complete(null); |
|
429 } |
|
430 } |
|
431 |
|
432 // called from the downstream Subscription.cancel() |
|
433 void downstreamCompletion() { |
|
434 upstreamSubscription.cancel(); |
|
435 cf.complete(null); |
|
436 } |
|
437 |
|
438 public void resetDownstreamDemand() { |
|
439 downstreamSubscription.demand.reset(); |
|
440 } |
|
441 |
|
442 @Override |
|
443 public String toString() { |
|
444 StringBuilder sb = new StringBuilder(); |
|
445 sb.append("SubscriberWrapper:") |
|
446 .append(" upstreamCompleted: ").append(Boolean.toString(upstreamCompleted)) |
|
447 .append(" upstreamWindow: ").append(upstreamWindow.toString()) |
|
448 .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted)) |
|
449 .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged)) |
|
450 .append(" outputQ size: ").append(Integer.toString(outputQ.size())) |
|
451 //.append(" outputQ: ").append(outputQ.toString()) |
|
452 .append(" cf: ").append(cf.toString()) |
|
453 .append(" downstreamSubscription: ").append(downstreamSubscription.toString()); |
|
454 |
|
455 return sb.toString(); |
|
456 } |
|
457 |
|
458 public String dbgString() { |
|
459 return "SubscriberWrapper"; |
|
460 } |
|
461 } |
|