|
1 /* |
|
2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package jdk.internal.net.http; |
|
27 |
|
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
|
30 import java.nio.ByteBuffer; |
|
31 import java.util.List; |
|
32 import java.util.Objects; |
|
33 import java.util.concurrent.Flow; |
|
34 import java.util.concurrent.atomic.AtomicLong; |
|
35 import java.util.concurrent.atomic.AtomicReference; |
|
36 import java.nio.channels.SelectableChannel; |
|
37 import java.nio.channels.SelectionKey; |
|
38 import java.nio.channels.SocketChannel; |
|
39 import java.util.ArrayList; |
|
40 import java.util.function.Consumer; |
|
41 import java.util.function.Supplier; |
|
42 import jdk.internal.net.http.common.Demand; |
|
43 import jdk.internal.net.http.common.FlowTube; |
|
44 import jdk.internal.net.http.common.Logger; |
|
45 import jdk.internal.net.http.common.SequentialScheduler; |
|
46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
|
47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
|
48 import jdk.internal.net.http.common.Utils; |
|
49 |
|
50 /** |
|
51 * A SocketTube is a terminal tube plugged directly into the socket. |
|
52 * The read subscriber should call {@code subscribe} on the SocketTube before |
|
53 * the SocketTube is subscribed to the write publisher. |
|
54 */ |
|
55 final class SocketTube implements FlowTube { |
|
56 |
|
57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
|
58 static final AtomicLong IDS = new AtomicLong(); |
|
59 |
|
60 private final HttpClientImpl client; |
|
61 private final SocketChannel channel; |
|
62 private final Supplier<ByteBuffer> buffersSource; |
|
63 private final Object lock = new Object(); |
|
64 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
65 private final InternalReadPublisher readPublisher; |
|
66 private final InternalWriteSubscriber writeSubscriber; |
|
67 private final long id = IDS.incrementAndGet(); |
|
68 |
|
69 public SocketTube(HttpClientImpl client, SocketChannel channel, |
|
70 Supplier<ByteBuffer> buffersSource) { |
|
71 this.client = client; |
|
72 this.channel = channel; |
|
73 this.buffersSource = buffersSource; |
|
74 this.readPublisher = new InternalReadPublisher(); |
|
75 this.writeSubscriber = new InternalWriteSubscriber(); |
|
76 } |
|
77 |
|
78 /** |
|
79 * Returns {@code true} if this flow is finished. |
|
80 * This happens when this flow internal read subscription is completed, |
|
81 * either normally (EOF reading) or exceptionally (EOF writing, or |
|
82 * underlying socket closed, or some exception occurred while reading or |
|
83 * writing to the socket). |
|
84 * |
|
85 * @return {@code true} if this flow is finished. |
|
86 */ |
|
87 public boolean isFinished() { |
|
88 InternalReadPublisher.InternalReadSubscription subscription = |
|
89 readPublisher.subscriptionImpl; |
|
90 return subscription != null && subscription.completed |
|
91 || subscription == null && errorRef.get() != null; |
|
92 } |
|
93 |
|
94 // ===================================================================== // |
|
95 // Flow.Publisher // |
|
96 // ======================================================================// |
|
97 |
|
98 /** |
|
99 * {@inheritDoc } |
|
100 * @apiNote This method should be called first. In particular, the caller |
|
101 * must ensure that this method must be called by the read |
|
102 * subscriber before the write publisher can call {@code onSubscribe}. |
|
103 * Failure to adhere to this contract may result in assertion errors. |
|
104 */ |
|
105 @Override |
|
106 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { |
|
107 Objects.requireNonNull(s); |
|
108 assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s; |
|
109 readPublisher.subscribe(s); |
|
110 } |
|
111 |
|
112 |
|
113 // ===================================================================== // |
|
114 // Flow.Subscriber // |
|
115 // ======================================================================// |
|
116 |
|
117 /** |
|
118 * {@inheritDoc } |
|
119 * @apiNote The caller must ensure that {@code subscribe} is called by |
|
120 * the read subscriber before {@code onSubscribe} is called by |
|
121 * the write publisher. |
|
122 * Failure to adhere to this contract may result in assertion errors. |
|
123 */ |
|
124 @Override |
|
125 public void onSubscribe(Flow.Subscription subscription) { |
|
126 writeSubscriber.onSubscribe(subscription); |
|
127 } |
|
128 |
|
129 @Override |
|
130 public void onNext(List<ByteBuffer> item) { |
|
131 writeSubscriber.onNext(item); |
|
132 } |
|
133 |
|
134 @Override |
|
135 public void onError(Throwable throwable) { |
|
136 writeSubscriber.onError(throwable); |
|
137 } |
|
138 |
|
139 @Override |
|
140 public void onComplete() { |
|
141 writeSubscriber.onComplete(); |
|
142 } |
|
143 |
|
144 // ===================================================================== // |
|
145 // Events // |
|
146 // ======================================================================// |
|
147 |
|
148 void signalClosed() { |
|
149 // Ensures that the subscriber will be terminated and that future |
|
150 // subscribers will be notified when the connection is closed. |
|
151 readPublisher.subscriptionImpl.signalError( |
|
152 new IOException("connection closed locally")); |
|
153 } |
|
154 |
|
155 /** |
|
156 * A restartable task used to process tasks in sequence. |
|
157 */ |
|
158 private static class SocketFlowTask implements RestartableTask { |
|
159 final Runnable task; |
|
160 private final Object monitor = new Object(); |
|
161 SocketFlowTask(Runnable task) { |
|
162 this.task = task; |
|
163 } |
|
164 @Override |
|
165 public final void run(DeferredCompleter taskCompleter) { |
|
166 try { |
|
167 // non contentious synchronized for visibility. |
|
168 synchronized(monitor) { |
|
169 task.run(); |
|
170 } |
|
171 } finally { |
|
172 taskCompleter.complete(); |
|
173 } |
|
174 } |
|
175 } |
|
176 |
|
177 // This is best effort - there's no guarantee that the printed set of values |
|
178 // is consistent. It should only be considered as weakly accurate - in |
|
179 // particular in what concerns the events states, especially when displaying |
|
180 // a read event state from a write event callback and conversely. |
|
181 void debugState(String when) { |
|
182 if (debug.on()) { |
|
183 StringBuilder state = new StringBuilder(); |
|
184 |
|
185 InternalReadPublisher.InternalReadSubscription sub = |
|
186 readPublisher.subscriptionImpl; |
|
187 InternalReadPublisher.ReadEvent readEvent = |
|
188 sub == null ? null : sub.readEvent; |
|
189 Demand rdemand = sub == null ? null : sub.demand; |
|
190 InternalWriteSubscriber.WriteEvent writeEvent = |
|
191 writeSubscriber.writeEvent; |
|
192 Demand wdemand = writeSubscriber.writeDemand; |
|
193 int rops = readEvent == null ? 0 : readEvent.interestOps(); |
|
194 long rd = rdemand == null ? 0 : rdemand.get(); |
|
195 int wops = writeEvent == null ? 0 : writeEvent.interestOps(); |
|
196 long wd = wdemand == null ? 0 : wdemand.get(); |
|
197 |
|
198 state.append(when).append(" Reading: [ops=") |
|
199 .append(rops).append(", demand=").append(rd) |
|
200 .append(", stopped=") |
|
201 .append((sub == null ? false : sub.readScheduler.isStopped())) |
|
202 .append("], Writing: [ops=").append(wops) |
|
203 .append(", demand=").append(wd) |
|
204 .append("]"); |
|
205 debug.log(state.toString()); |
|
206 } |
|
207 } |
|
208 |
|
209 /** |
|
210 * A repeatable event that can be paused or resumed by changing its |
|
211 * interestOps. When the event is fired, it is first paused before being |
|
212 * signaled. It is the responsibility of the code triggered by |
|
213 * {@code signalEvent} to resume the event if required. |
|
214 */ |
|
215 private static abstract class SocketFlowEvent extends AsyncEvent { |
|
216 final SocketChannel channel; |
|
217 final int defaultInterest; |
|
218 volatile int interestOps; |
|
219 volatile boolean registered; |
|
220 SocketFlowEvent(int defaultInterest, SocketChannel channel) { |
|
221 super(AsyncEvent.REPEATING); |
|
222 this.defaultInterest = defaultInterest; |
|
223 this.channel = channel; |
|
224 } |
|
225 final boolean registered() {return registered;} |
|
226 final void resume() { |
|
227 interestOps = defaultInterest; |
|
228 registered = true; |
|
229 } |
|
230 final void pause() {interestOps = 0;} |
|
231 @Override |
|
232 public final SelectableChannel channel() {return channel;} |
|
233 @Override |
|
234 public final int interestOps() {return interestOps;} |
|
235 |
|
236 @Override |
|
237 public final void handle() { |
|
238 pause(); // pause, then signal |
|
239 signalEvent(); // won't be fired again until resumed. |
|
240 } |
|
241 @Override |
|
242 public final void abort(IOException error) { |
|
243 debug().log(() -> "abort: " + error); |
|
244 pause(); // pause, then signal |
|
245 signalError(error); // should not be resumed after abort (not checked) |
|
246 } |
|
247 |
|
248 protected abstract void signalEvent(); |
|
249 protected abstract void signalError(Throwable error); |
|
250 abstract Logger debug(); |
|
251 } |
|
252 |
|
253 // ===================================================================== // |
|
254 // Writing // |
|
255 // ======================================================================// |
|
256 |
|
257 // This class makes the assumption that the publisher will call onNext |
|
258 // sequentially, and that onNext won't be called if the demand has not been |
|
259 // incremented by request(1). |
|
260 // It has a 'queue of 1' meaning that it will call request(1) in |
|
261 // onSubscribe, and then only after its 'current' buffer list has been |
|
262 // fully written and current set to null; |
|
263 private final class InternalWriteSubscriber |
|
264 implements Flow.Subscriber<List<ByteBuffer>> { |
|
265 |
|
266 volatile WriteSubscription subscription; |
|
267 volatile List<ByteBuffer> current; |
|
268 volatile boolean completed; |
|
269 final AsyncTriggerEvent startSubscription = |
|
270 new AsyncTriggerEvent(this::signalError, this::startSubscription); |
|
271 final WriteEvent writeEvent = new WriteEvent(channel, this); |
|
272 final Demand writeDemand = new Demand(); |
|
273 |
|
274 @Override |
|
275 public void onSubscribe(Flow.Subscription subscription) { |
|
276 WriteSubscription previous = this.subscription; |
|
277 if (debug.on()) debug.log("subscribed for writing"); |
|
278 try { |
|
279 boolean needEvent = current == null; |
|
280 if (needEvent) { |
|
281 if (previous != null && previous.upstreamSubscription != subscription) { |
|
282 previous.dropSubscription(); |
|
283 } |
|
284 } |
|
285 this.subscription = new WriteSubscription(subscription); |
|
286 if (needEvent) { |
|
287 if (debug.on()) |
|
288 debug.log("write: registering startSubscription event"); |
|
289 client.registerEvent(startSubscription); |
|
290 } |
|
291 } catch (Throwable t) { |
|
292 signalError(t); |
|
293 } |
|
294 } |
|
295 |
|
296 @Override |
|
297 public void onNext(List<ByteBuffer> bufs) { |
|
298 assert current == null : dbgString() // this is a queue of 1. |
|
299 + "w.onNext current: " + current; |
|
300 assert subscription != null : dbgString() |
|
301 + "w.onNext: subscription is null"; |
|
302 current = bufs; |
|
303 tryFlushCurrent(client.isSelectorThread()); // may be in selector thread |
|
304 // For instance in HTTP/2, a received SETTINGS frame might trigger |
|
305 // the sending of a SETTINGS frame in turn which might cause |
|
306 // onNext to be called from within the same selector thread that the |
|
307 // original SETTINGS frames arrived on. If rs is the read-subscriber |
|
308 // and ws is the write-subscriber then the following can occur: |
|
309 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write |
|
310 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent |
|
311 debugState("leaving w.onNext"); |
|
312 } |
|
313 |
|
314 // Don't use a SequentialScheduler here: rely on onNext() being invoked |
|
315 // sequentially, and not being invoked if there is no demand, request(1). |
|
316 // onNext is usually called from within a user / executor thread. |
|
317 // Initial writing will be performed in that thread. If for some reason, |
|
318 // not all the data can be written, a writeEvent will be registered, and |
|
319 // writing will resume in the the selector manager thread when the |
|
320 // writeEvent is fired. |
|
321 // |
|
322 // If this method is invoked in the selector manager thread (because of |
|
323 // a writeEvent), then the executor will be used to invoke request(1), |
|
324 // ensuring that onNext() won't be invoked from within the selector |
|
325 // thread. If not in the selector manager thread, then request(1) is |
|
326 // invoked directly. |
|
327 void tryFlushCurrent(boolean inSelectorThread) { |
|
328 List<ByteBuffer> bufs = current; |
|
329 if (bufs == null) return; |
|
330 try { |
|
331 assert inSelectorThread == client.isSelectorThread() : |
|
332 "should " + (inSelectorThread ? "" : "not ") |
|
333 + " be in the selector thread"; |
|
334 long remaining = Utils.remaining(bufs); |
|
335 if (debug.on()) debug.log("trying to write: %d", remaining); |
|
336 long written = writeAvailable(bufs); |
|
337 if (debug.on()) debug.log("wrote: %d", written); |
|
338 assert written >= 0 : "negative number of bytes written:" + written; |
|
339 assert written <= remaining; |
|
340 if (remaining - written == 0) { |
|
341 current = null; |
|
342 if (writeDemand.tryDecrement()) { |
|
343 Runnable requestMore = this::requestMore; |
|
344 if (inSelectorThread) { |
|
345 assert client.isSelectorThread(); |
|
346 client.theExecutor().execute(requestMore); |
|
347 } else { |
|
348 assert !client.isSelectorThread(); |
|
349 requestMore.run(); |
|
350 } |
|
351 } |
|
352 } else { |
|
353 resumeWriteEvent(inSelectorThread); |
|
354 } |
|
355 } catch (Throwable t) { |
|
356 signalError(t); |
|
357 subscription.cancel(); |
|
358 } |
|
359 } |
|
360 |
|
361 // Kick off the initial request:1 that will start the writing side. |
|
362 // Invoked in the selector manager thread. |
|
363 void startSubscription() { |
|
364 try { |
|
365 if (debug.on()) debug.log("write: starting subscription"); |
|
366 assert client.isSelectorThread(); |
|
367 // make sure read registrations are handled before; |
|
368 readPublisher.subscriptionImpl.handlePending(); |
|
369 if (debug.on()) debug.log("write: offloading requestMore"); |
|
370 // start writing; |
|
371 client.theExecutor().execute(this::requestMore); |
|
372 } catch(Throwable t) { |
|
373 signalError(t); |
|
374 } |
|
375 } |
|
376 |
|
377 void requestMore() { |
|
378 WriteSubscription subscription = this.subscription; |
|
379 subscription.requestMore(); |
|
380 } |
|
381 |
|
382 @Override |
|
383 public void onError(Throwable throwable) { |
|
384 signalError(throwable); |
|
385 } |
|
386 |
|
387 @Override |
|
388 public void onComplete() { |
|
389 completed = true; |
|
390 // no need to pause the write event here: the write event will |
|
391 // be paused if there is nothing more to write. |
|
392 List<ByteBuffer> bufs = current; |
|
393 long remaining = bufs == null ? 0 : Utils.remaining(bufs); |
|
394 if (debug.on()) |
|
395 debug.log( "write completed, %d yet to send", remaining); |
|
396 debugState("InternalWriteSubscriber::onComplete"); |
|
397 } |
|
398 |
|
399 void resumeWriteEvent(boolean inSelectorThread) { |
|
400 if (debug.on()) debug.log("scheduling write event"); |
|
401 resumeEvent(writeEvent, this::signalError); |
|
402 } |
|
403 |
|
404 void signalWritable() { |
|
405 if (debug.on()) debug.log("channel is writable"); |
|
406 tryFlushCurrent(true); |
|
407 } |
|
408 |
|
409 void signalError(Throwable error) { |
|
410 debug.log(() -> "write error: " + error); |
|
411 completed = true; |
|
412 readPublisher.signalError(error); |
|
413 } |
|
414 |
|
415 // A repeatable WriteEvent which is paused after firing and can |
|
416 // be resumed if required - see SocketFlowEvent; |
|
417 final class WriteEvent extends SocketFlowEvent { |
|
418 final InternalWriteSubscriber sub; |
|
419 WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) { |
|
420 super(SelectionKey.OP_WRITE, channel); |
|
421 this.sub = sub; |
|
422 } |
|
423 @Override |
|
424 protected final void signalEvent() { |
|
425 try { |
|
426 client.eventUpdated(this); |
|
427 sub.signalWritable(); |
|
428 } catch(Throwable t) { |
|
429 sub.signalError(t); |
|
430 } |
|
431 } |
|
432 |
|
433 @Override |
|
434 protected void signalError(Throwable error) { |
|
435 sub.signalError(error); |
|
436 } |
|
437 |
|
438 @Override |
|
439 Logger debug() { return debug; } |
|
440 } |
|
441 |
|
442 final class WriteSubscription implements Flow.Subscription { |
|
443 final Flow.Subscription upstreamSubscription; |
|
444 volatile boolean cancelled; |
|
445 WriteSubscription(Flow.Subscription subscription) { |
|
446 this.upstreamSubscription = subscription; |
|
447 } |
|
448 |
|
449 @Override |
|
450 public void request(long n) { |
|
451 if (cancelled) return; |
|
452 upstreamSubscription.request(n); |
|
453 } |
|
454 |
|
455 @Override |
|
456 public void cancel() { |
|
457 dropSubscription(); |
|
458 upstreamSubscription.cancel(); |
|
459 } |
|
460 |
|
461 void dropSubscription() { |
|
462 synchronized (InternalWriteSubscriber.this) { |
|
463 cancelled = true; |
|
464 if (debug.on()) debug.log("write: resetting demand to 0"); |
|
465 writeDemand.reset(); |
|
466 } |
|
467 } |
|
468 |
|
469 void requestMore() { |
|
470 try { |
|
471 if (completed || cancelled) return; |
|
472 boolean requestMore; |
|
473 long d; |
|
474 // don't fiddle with demand after cancel. |
|
475 // see dropSubscription. |
|
476 synchronized (InternalWriteSubscriber.this) { |
|
477 if (cancelled) return; |
|
478 d = writeDemand.get(); |
|
479 requestMore = writeDemand.increaseIfFulfilled(); |
|
480 } |
|
481 if (requestMore) { |
|
482 if (debug.on()) debug.log("write: requesting more..."); |
|
483 upstreamSubscription.request(1); |
|
484 } else { |
|
485 if (debug.on()) |
|
486 debug.log("write: no need to request more: %d", d); |
|
487 } |
|
488 } catch (Throwable t) { |
|
489 if (debug.on()) |
|
490 debug.log("write: error while requesting more: " + t); |
|
491 cancelled = true; |
|
492 signalError(t); |
|
493 subscription.cancel(); |
|
494 } finally { |
|
495 debugState("leaving requestMore: "); |
|
496 } |
|
497 } |
|
498 } |
|
499 } |
|
500 |
|
501 // ===================================================================== // |
|
502 // Reading // |
|
503 // ===================================================================== // |
|
504 |
|
505 // The InternalReadPublisher uses a SequentialScheduler to ensure that |
|
506 // onNext/onError/onComplete are called sequentially on the caller's |
|
507 // subscriber. |
|
508 // However, it relies on the fact that the only time where |
|
509 // runOrSchedule() is called from a user/executor thread is in signalError, |
|
510 // right after the errorRef has been set. |
|
511 // Because the sequential scheduler's task always checks for errors first, |
|
512 // and always terminate the scheduler on error, then it is safe to assume |
|
513 // that if it reaches the point where it reads from the channel, then |
|
514 // it is running in the SelectorManager thread. This is because all |
|
515 // other invocation of runOrSchedule() are triggered from within a |
|
516 // ReadEvent. |
|
517 // |
|
518 // When pausing/resuming the event, some shortcuts can then be taken |
|
519 // when we know we're running in the selector manager thread |
|
520 // (in that case there's no need to call client.eventUpdated(readEvent); |
|
521 // |
|
522 private final class InternalReadPublisher |
|
523 implements Flow.Publisher<List<ByteBuffer>> { |
|
524 private final InternalReadSubscription subscriptionImpl |
|
525 = new InternalReadSubscription(); |
|
526 AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>(); |
|
527 private volatile ReadSubscription subscription; |
|
528 |
|
529 @Override |
|
530 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { |
|
531 Objects.requireNonNull(s); |
|
532 |
|
533 TubeSubscriber sub = FlowTube.asTubeSubscriber(s); |
|
534 ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); |
|
535 ReadSubscription previous = pendingSubscription.getAndSet(target); |
|
536 |
|
537 if (previous != null && previous != target) { |
|
538 if (debug.on()) |
|
539 debug.log("read publisher: dropping pending subscriber: " |
|
540 + previous.subscriber); |
|
541 previous.errorRef.compareAndSet(null, errorRef.get()); |
|
542 previous.signalOnSubscribe(); |
|
543 if (subscriptionImpl.completed) { |
|
544 previous.signalCompletion(); |
|
545 } else { |
|
546 previous.subscriber.dropSubscription(); |
|
547 } |
|
548 } |
|
549 |
|
550 if (debug.on()) debug.log("read publisher got subscriber"); |
|
551 subscriptionImpl.signalSubscribe(); |
|
552 debugState("leaving read.subscribe: "); |
|
553 } |
|
554 |
|
555 void signalError(Throwable error) { |
|
556 if (debug.on()) debug.log("error signalled " + error); |
|
557 if (!errorRef.compareAndSet(null, error)) { |
|
558 return; |
|
559 } |
|
560 subscriptionImpl.handleError(); |
|
561 } |
|
562 |
|
563 final class ReadSubscription implements Flow.Subscription { |
|
564 final InternalReadSubscription impl; |
|
565 final TubeSubscriber subscriber; |
|
566 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
567 volatile boolean subscribed; |
|
568 volatile boolean cancelled; |
|
569 volatile boolean completed; |
|
570 |
|
571 public ReadSubscription(InternalReadSubscription impl, |
|
572 TubeSubscriber subscriber) { |
|
573 this.impl = impl; |
|
574 this.subscriber = subscriber; |
|
575 } |
|
576 |
|
577 @Override |
|
578 public void cancel() { |
|
579 cancelled = true; |
|
580 } |
|
581 |
|
582 @Override |
|
583 public void request(long n) { |
|
584 if (!cancelled) { |
|
585 impl.request(n); |
|
586 } else { |
|
587 if (debug.on()) |
|
588 debug.log("subscription cancelled, ignoring request %d", n); |
|
589 } |
|
590 } |
|
591 |
|
592 void signalCompletion() { |
|
593 assert subscribed || cancelled; |
|
594 if (completed || cancelled) return; |
|
595 synchronized (this) { |
|
596 if (completed) return; |
|
597 completed = true; |
|
598 } |
|
599 Throwable error = errorRef.get(); |
|
600 if (error != null) { |
|
601 if (debug.on()) |
|
602 debug.log("forwarding error to subscriber: " + error); |
|
603 subscriber.onError(error); |
|
604 } else { |
|
605 if (debug.on()) debug.log("completing subscriber"); |
|
606 subscriber.onComplete(); |
|
607 } |
|
608 } |
|
609 |
|
610 void signalOnSubscribe() { |
|
611 if (subscribed || cancelled) return; |
|
612 synchronized (this) { |
|
613 if (subscribed || cancelled) return; |
|
614 subscribed = true; |
|
615 } |
|
616 subscriber.onSubscribe(this); |
|
617 if (debug.on()) debug.log("onSubscribe called"); |
|
618 if (errorRef.get() != null) { |
|
619 signalCompletion(); |
|
620 } |
|
621 } |
|
622 } |
|
623 |
|
624 final class InternalReadSubscription implements Flow.Subscription { |
|
625 |
|
626 private final Demand demand = new Demand(); |
|
627 final SequentialScheduler readScheduler; |
|
628 private volatile boolean completed; |
|
629 private final ReadEvent readEvent; |
|
630 private final AsyncEvent subscribeEvent; |
|
631 |
|
632 InternalReadSubscription() { |
|
633 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read)); |
|
634 subscribeEvent = new AsyncTriggerEvent(this::signalError, |
|
635 this::handleSubscribeEvent); |
|
636 readEvent = new ReadEvent(channel, this); |
|
637 } |
|
638 |
|
639 /* |
|
640 * This method must be invoked before any other method of this class. |
|
641 */ |
|
642 final void signalSubscribe() { |
|
643 if (readScheduler.isStopped() || completed) { |
|
644 // if already completed or stopped we can handle any |
|
645 // pending connection directly from here. |
|
646 if (debug.on()) |
|
647 debug.log("handling pending subscription while completed"); |
|
648 handlePending(); |
|
649 } else { |
|
650 try { |
|
651 if (debug.on()) debug.log("registering subscribe event"); |
|
652 client.registerEvent(subscribeEvent); |
|
653 } catch (Throwable t) { |
|
654 signalError(t); |
|
655 handlePending(); |
|
656 } |
|
657 } |
|
658 } |
|
659 |
|
660 final void handleSubscribeEvent() { |
|
661 assert client.isSelectorThread(); |
|
662 debug.log("subscribe event raised"); |
|
663 readScheduler.runOrSchedule(); |
|
664 if (readScheduler.isStopped() || completed) { |
|
665 // if already completed or stopped we can handle any |
|
666 // pending connection directly from here. |
|
667 if (debug.on()) |
|
668 debug.log("handling pending subscription when completed"); |
|
669 handlePending(); |
|
670 } |
|
671 } |
|
672 |
|
673 |
|
674 /* |
|
675 * Although this method is thread-safe, the Reactive-Streams spec seems |
|
676 * to not require it to be as such. It's a responsibility of the |
|
677 * subscriber to signal demand in a thread-safe manner. |
|
678 * |
|
679 * See Reactive Streams specification, rules 2.7 and 3.4. |
|
680 */ |
|
681 @Override |
|
682 public final void request(long n) { |
|
683 if (n > 0L) { |
|
684 boolean wasFulfilled = demand.increase(n); |
|
685 if (wasFulfilled) { |
|
686 if (debug.on()) debug.log("got some demand for reading"); |
|
687 resumeReadEvent(); |
|
688 // if demand has been changed from fulfilled |
|
689 // to unfulfilled register read event; |
|
690 } |
|
691 } else { |
|
692 signalError(new IllegalArgumentException("non-positive request")); |
|
693 } |
|
694 debugState("leaving request("+n+"): "); |
|
695 } |
|
696 |
|
697 @Override |
|
698 public final void cancel() { |
|
699 pauseReadEvent(); |
|
700 readScheduler.stop(); |
|
701 } |
|
702 |
|
703 private void resumeReadEvent() { |
|
704 if (debug.on()) debug.log("resuming read event"); |
|
705 resumeEvent(readEvent, this::signalError); |
|
706 } |
|
707 |
|
708 private void pauseReadEvent() { |
|
709 if (debug.on()) debug.log("pausing read event"); |
|
710 pauseEvent(readEvent, this::signalError); |
|
711 } |
|
712 |
|
713 |
|
714 final void handleError() { |
|
715 assert errorRef.get() != null; |
|
716 readScheduler.runOrSchedule(); |
|
717 } |
|
718 |
|
719 final void signalError(Throwable error) { |
|
720 if (!errorRef.compareAndSet(null, error)) { |
|
721 return; |
|
722 } |
|
723 if (debug.on()) debug.log("got read error: " + error); |
|
724 readScheduler.runOrSchedule(); |
|
725 } |
|
726 |
|
727 final void signalReadable() { |
|
728 readScheduler.runOrSchedule(); |
|
729 } |
|
730 |
|
731 /** The body of the task that runs in SequentialScheduler. */ |
|
732 final void read() { |
|
733 // It is important to only call pauseReadEvent() when stopping |
|
734 // the scheduler. The event is automatically paused before |
|
735 // firing, and trying to pause it again could cause a race |
|
736 // condition between this loop, which calls tryDecrementDemand(), |
|
737 // and the thread that calls request(n), which will try to resume |
|
738 // reading. |
|
739 try { |
|
740 while(!readScheduler.isStopped()) { |
|
741 if (completed) return; |
|
742 |
|
743 // make sure we have a subscriber |
|
744 if (handlePending()) { |
|
745 if (debug.on()) |
|
746 debug.log("pending subscriber subscribed"); |
|
747 return; |
|
748 } |
|
749 |
|
750 // If an error was signaled, we might not be in the |
|
751 // the selector thread, and that is OK, because we |
|
752 // will just call onError and return. |
|
753 ReadSubscription current = subscription; |
|
754 Throwable error = errorRef.get(); |
|
755 if (current == null) { |
|
756 assert error != null; |
|
757 if (debug.on()) |
|
758 debug.log("error raised before subscriber subscribed: %s", |
|
759 (Object)error); |
|
760 return; |
|
761 } |
|
762 TubeSubscriber subscriber = current.subscriber; |
|
763 if (error != null) { |
|
764 completed = true; |
|
765 // safe to pause here because we're finished anyway. |
|
766 pauseReadEvent(); |
|
767 if (debug.on()) |
|
768 debug.log("Sending error " + error |
|
769 + " to subscriber " + subscriber); |
|
770 current.errorRef.compareAndSet(null, error); |
|
771 current.signalCompletion(); |
|
772 readScheduler.stop(); |
|
773 debugState("leaving read() loop with error: "); |
|
774 return; |
|
775 } |
|
776 |
|
777 // If we reach here then we must be in the selector thread. |
|
778 assert client.isSelectorThread(); |
|
779 if (demand.tryDecrement()) { |
|
780 // we have demand. |
|
781 try { |
|
782 List<ByteBuffer> bytes = readAvailable(); |
|
783 if (bytes == EOF) { |
|
784 if (!completed) { |
|
785 if (debug.on()) debug.log("got read EOF"); |
|
786 completed = true; |
|
787 // safe to pause here because we're finished |
|
788 // anyway. |
|
789 pauseReadEvent(); |
|
790 current.signalCompletion(); |
|
791 readScheduler.stop(); |
|
792 } |
|
793 debugState("leaving read() loop after EOF: "); |
|
794 return; |
|
795 } else if (Utils.remaining(bytes) > 0) { |
|
796 // the subscriber is responsible for offloading |
|
797 // to another thread if needed. |
|
798 if (debug.on()) |
|
799 debug.log("read bytes: " + Utils.remaining(bytes)); |
|
800 assert !current.completed; |
|
801 subscriber.onNext(bytes); |
|
802 // we could continue looping until the demand |
|
803 // reaches 0. However, that would risk starving |
|
804 // other connections (bound to other socket |
|
805 // channels) - as other selected keys activated |
|
806 // by the selector manager thread might be |
|
807 // waiting for this event to terminate. |
|
808 // So resume the read event and return now... |
|
809 resumeReadEvent(); |
|
810 debugState("leaving read() loop after onNext: "); |
|
811 return; |
|
812 } else { |
|
813 // nothing available! |
|
814 if (debug.on()) debug.log("no more bytes available"); |
|
815 // re-increment the demand and resume the read |
|
816 // event. This ensures that this loop is |
|
817 // executed again when the socket becomes |
|
818 // readable again. |
|
819 demand.increase(1); |
|
820 resumeReadEvent(); |
|
821 debugState("leaving read() loop with no bytes"); |
|
822 return; |
|
823 } |
|
824 } catch (Throwable x) { |
|
825 signalError(x); |
|
826 continue; |
|
827 } |
|
828 } else { |
|
829 if (debug.on()) debug.log("no more demand for reading"); |
|
830 // the event is paused just after firing, so it should |
|
831 // still be paused here, unless the demand was just |
|
832 // incremented from 0 to n, in which case, the |
|
833 // event will be resumed, causing this loop to be |
|
834 // invoked again when the socket becomes readable: |
|
835 // This is what we want. |
|
836 // Trying to pause the event here would actually |
|
837 // introduce a race condition between this loop and |
|
838 // request(n). |
|
839 debugState("leaving read() loop with no demand"); |
|
840 break; |
|
841 } |
|
842 } |
|
843 } catch (Throwable t) { |
|
844 if (debug.on()) debug.log("Unexpected exception in read loop", t); |
|
845 signalError(t); |
|
846 } finally { |
|
847 handlePending(); |
|
848 } |
|
849 } |
|
850 |
|
851 boolean handlePending() { |
|
852 ReadSubscription pending = pendingSubscription.getAndSet(null); |
|
853 if (pending == null) return false; |
|
854 if (debug.on()) |
|
855 debug.log("handling pending subscription for %s", |
|
856 pending.subscriber); |
|
857 ReadSubscription current = subscription; |
|
858 if (current != null && current != pending && !completed) { |
|
859 current.subscriber.dropSubscription(); |
|
860 } |
|
861 if (debug.on()) debug.log("read demand reset to 0"); |
|
862 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. |
|
863 pending.errorRef.compareAndSet(null, errorRef.get()); |
|
864 if (!readScheduler.isStopped()) { |
|
865 subscription = pending; |
|
866 } else { |
|
867 if (debug.on()) debug.log("socket tube is already stopped"); |
|
868 } |
|
869 if (debug.on()) debug.log("calling onSubscribe"); |
|
870 pending.signalOnSubscribe(); |
|
871 if (completed) { |
|
872 pending.errorRef.compareAndSet(null, errorRef.get()); |
|
873 pending.signalCompletion(); |
|
874 } |
|
875 return true; |
|
876 } |
|
877 } |
|
878 |
|
879 |
|
880 // A repeatable ReadEvent which is paused after firing and can |
|
881 // be resumed if required - see SocketFlowEvent; |
|
882 final class ReadEvent extends SocketFlowEvent { |
|
883 final InternalReadSubscription sub; |
|
884 ReadEvent(SocketChannel channel, InternalReadSubscription sub) { |
|
885 super(SelectionKey.OP_READ, channel); |
|
886 this.sub = sub; |
|
887 } |
|
888 @Override |
|
889 protected final void signalEvent() { |
|
890 try { |
|
891 client.eventUpdated(this); |
|
892 sub.signalReadable(); |
|
893 } catch(Throwable t) { |
|
894 sub.signalError(t); |
|
895 } |
|
896 } |
|
897 |
|
898 @Override |
|
899 protected final void signalError(Throwable error) { |
|
900 sub.signalError(error); |
|
901 } |
|
902 |
|
903 @Override |
|
904 Logger debug() { return debug; } |
|
905 } |
|
906 } |
|
907 |
|
908 // ===================================================================== // |
|
909 // Socket Channel Read/Write // |
|
910 // ===================================================================== // |
|
911 static final int MAX_BUFFERS = 3; |
|
912 static final List<ByteBuffer> EOF = List.of(); |
|
913 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
|
914 |
|
915 // readAvailable() will read bytes into the 'current' ByteBuffer until |
|
916 // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read(). |
|
917 // When that happens, a slice of the data that has been read so far |
|
918 // is inserted into the returned buffer list, and if the current buffer |
|
919 // has remaining space, that space will be used to read more data when |
|
920 // the channel becomes readable again. |
|
921 private volatile ByteBuffer current; |
|
922 private List<ByteBuffer> readAvailable() throws IOException { |
|
923 ByteBuffer buf = current; |
|
924 buf = (buf == null || !buf.hasRemaining()) |
|
925 ? (current = buffersSource.get()) : buf; |
|
926 assert buf.hasRemaining(); |
|
927 |
|
928 int read; |
|
929 int pos = buf.position(); |
|
930 List<ByteBuffer> list = null; |
|
931 while (buf.hasRemaining()) { |
|
932 try { |
|
933 while ((read = channel.read(buf)) > 0) { |
|
934 if (!buf.hasRemaining()) |
|
935 break; |
|
936 } |
|
937 } catch (IOException x) { |
|
938 if (buf.position() == pos && list == null) { |
|
939 // no bytes have been read, just throw... |
|
940 throw x; |
|
941 } else { |
|
942 // some bytes have been read, return them and fail next time |
|
943 errorRef.compareAndSet(null, x); |
|
944 read = 0; // ensures outer loop will exit |
|
945 } |
|
946 } |
|
947 |
|
948 // nothing read; |
|
949 if (buf.position() == pos) { |
|
950 // An empty list signals the end of data, and should only be |
|
951 // returned if read == -1. If some data has already been read, |
|
952 // then it must be returned. -1 will be returned next time |
|
953 // the caller attempts to read something. |
|
954 if (list == null) { |
|
955 // nothing read - list was null - return EOF or NOTHING |
|
956 list = read == -1 ? EOF : NOTHING; |
|
957 } |
|
958 break; |
|
959 } |
|
960 |
|
961 // check whether this buffer has still some free space available. |
|
962 // if so, we will keep it for the next round. |
|
963 final boolean hasRemaining = buf.hasRemaining(); |
|
964 |
|
965 // creates a slice to add to the list |
|
966 int limit = buf.limit(); |
|
967 buf.limit(buf.position()); |
|
968 buf.position(pos); |
|
969 ByteBuffer slice = buf.slice(); |
|
970 |
|
971 // restore buffer state to what it was before creating the slice |
|
972 buf.position(buf.limit()); |
|
973 buf.limit(limit); |
|
974 |
|
975 // add the buffer to the list |
|
976 list = addToList(list, slice.asReadOnlyBuffer()); |
|
977 if (read <= 0 || list.size() == MAX_BUFFERS) { |
|
978 break; |
|
979 } |
|
980 |
|
981 buf = hasRemaining ? buf : (current = buffersSource.get()); |
|
982 pos = buf.position(); |
|
983 assert buf.hasRemaining(); |
|
984 } |
|
985 return list; |
|
986 } |
|
987 |
|
988 private <T> List<T> addToList(List<T> list, T item) { |
|
989 int size = list == null ? 0 : list.size(); |
|
990 switch (size) { |
|
991 case 0: return List.of(item); |
|
992 case 1: return List.of(list.get(0), item); |
|
993 case 2: return List.of(list.get(0), list.get(1), item); |
|
994 default: // slow path if MAX_BUFFERS > 3 |
|
995 ArrayList<T> res = new ArrayList<>(list); |
|
996 res.add(item); |
|
997 return res; |
|
998 } |
|
999 } |
|
1000 |
|
1001 private long writeAvailable(List<ByteBuffer> bytes) throws IOException { |
|
1002 ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY); |
|
1003 final long remaining = Utils.remaining(srcs); |
|
1004 long written = 0; |
|
1005 while (remaining > written) { |
|
1006 try { |
|
1007 long w = channel.write(srcs); |
|
1008 assert w >= 0 : "negative number of bytes written:" + w; |
|
1009 if (w == 0) { |
|
1010 break; |
|
1011 } |
|
1012 written += w; |
|
1013 } catch (IOException x) { |
|
1014 if (written == 0) { |
|
1015 // no bytes were written just throw |
|
1016 throw x; |
|
1017 } else { |
|
1018 // return how many bytes were written, will fail next time |
|
1019 break; |
|
1020 } |
|
1021 } |
|
1022 } |
|
1023 return written; |
|
1024 } |
|
1025 |
|
1026 private void resumeEvent(SocketFlowEvent event, |
|
1027 Consumer<Throwable> errorSignaler) { |
|
1028 boolean registrationRequired; |
|
1029 synchronized(lock) { |
|
1030 registrationRequired = !event.registered(); |
|
1031 event.resume(); |
|
1032 } |
|
1033 try { |
|
1034 if (registrationRequired) { |
|
1035 client.registerEvent(event); |
|
1036 } else { |
|
1037 client.eventUpdated(event); |
|
1038 } |
|
1039 } catch(Throwable t) { |
|
1040 errorSignaler.accept(t); |
|
1041 } |
|
1042 } |
|
1043 |
|
1044 private void pauseEvent(SocketFlowEvent event, |
|
1045 Consumer<Throwable> errorSignaler) { |
|
1046 synchronized(lock) { |
|
1047 event.pause(); |
|
1048 } |
|
1049 try { |
|
1050 client.eventUpdated(event); |
|
1051 } catch(Throwable t) { |
|
1052 errorSignaler.accept(t); |
|
1053 } |
|
1054 } |
|
1055 |
|
1056 @Override |
|
1057 public void connectFlows(TubePublisher writePublisher, |
|
1058 TubeSubscriber readSubscriber) { |
|
1059 if (debug.on()) debug.log("connecting flows"); |
|
1060 this.subscribe(readSubscriber); |
|
1061 writePublisher.subscribe(this); |
|
1062 } |
|
1063 |
|
1064 |
|
1065 @Override |
|
1066 public String toString() { |
|
1067 return dbgString(); |
|
1068 } |
|
1069 |
|
1070 final String dbgString() { |
|
1071 return "SocketTube("+id+")"; |
|
1072 } |
|
1073 } |