23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 package jdk.internal.net.http; |
26 package jdk.internal.net.http; |
27 |
27 |
28 import java.io.EOFException; |
|
29 import java.io.IOException; |
28 import java.io.IOException; |
30 import java.lang.System.Logger.Level; |
29 import java.lang.System.Logger.Level; |
31 import java.nio.ByteBuffer; |
30 import java.nio.ByteBuffer; |
32 import java.util.List; |
31 import java.util.List; |
33 import java.util.Objects; |
32 import java.util.Objects; |
38 import java.nio.channels.SelectionKey; |
37 import java.nio.channels.SelectionKey; |
39 import java.nio.channels.SocketChannel; |
38 import java.nio.channels.SocketChannel; |
40 import java.util.ArrayList; |
39 import java.util.ArrayList; |
41 import java.util.function.Consumer; |
40 import java.util.function.Consumer; |
42 import java.util.function.Supplier; |
41 import java.util.function.Supplier; |
43 |
|
44 import jdk.internal.net.http.common.Demand; |
42 import jdk.internal.net.http.common.Demand; |
45 import jdk.internal.net.http.common.FlowTube; |
43 import jdk.internal.net.http.common.FlowTube; |
46 import jdk.internal.net.http.common.SequentialScheduler; |
44 import jdk.internal.net.http.common.SequentialScheduler; |
47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
45 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
46 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
49 import jdk.internal.net.http.common.Utils; |
47 import jdk.internal.net.http.common.Utils; |
50 |
48 |
51 /** |
49 /** |
52 * A SocketTube is a terminal tube plugged directly into the socket. |
50 * A SocketTube is a terminal tube plugged directly into the socket. |
53 * The read subscriber should call {@code subscribe} on the SocketTube before |
51 * The read subscriber should call {@code subscribe} on the SocketTube before |
54 * the SocketTube can be subscribed to the write publisher. |
52 * the SocketTube is subscribed to the write publisher. |
55 */ |
53 */ |
56 final class SocketTube implements FlowTube { |
54 final class SocketTube implements FlowTube { |
57 |
55 |
58 static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag |
56 static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag |
59 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
57 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
146 // ===================================================================== // |
144 // ===================================================================== // |
147 // Events // |
145 // Events // |
148 // ======================================================================// |
146 // ======================================================================// |
149 |
147 |
150 void signalClosed() { |
148 void signalClosed() { |
151 // Ensure that the subscriber will be terminated |
149 // Ensures that the subscriber will be terminated and that future |
152 // and that future subscribers will be notified |
150 // subscribers will be notified when the connection is closed. |
153 // when the connection is closed. |
|
154 readPublisher.subscriptionImpl.signalError( |
151 readPublisher.subscriptionImpl.signalError( |
155 new IOException("connection closed locally")); |
152 new IOException("connection closed locally")); |
156 } |
153 } |
157 |
154 |
158 /** |
155 /** |
175 taskCompleter.complete(); |
172 taskCompleter.complete(); |
176 } |
173 } |
177 } |
174 } |
178 } |
175 } |
179 |
176 |
180 // This is best effort - there's no guarantee that the printed set |
177 // This is best effort - there's no guarantee that the printed set of values |
181 // of values is consistent. It should only be considered as |
178 // is consistent. It should only be considered as weakly accurate - in |
182 // weakly accurate - in particular in what concerns the events states, |
179 // particular in what concerns the events states, especially when displaying |
183 // especially when displaying a read event state from a write event |
180 // a read event state from a write event callback and conversely. |
184 // callback and conversely. |
|
185 void debugState(String when) { |
181 void debugState(String when) { |
186 if (debug.isLoggable(Level.DEBUG)) { |
182 if (debug.isLoggable(Level.DEBUG)) { |
187 StringBuilder state = new StringBuilder(); |
183 StringBuilder state = new StringBuilder(); |
188 |
184 |
189 InternalReadPublisher.InternalReadSubscription sub = |
185 InternalReadPublisher.InternalReadSubscription sub = |
209 debug.log(Level.DEBUG, state.toString()); |
205 debug.log(Level.DEBUG, state.toString()); |
210 } |
206 } |
211 } |
207 } |
212 |
208 |
213 /** |
209 /** |
214 * A repeatable event that can be paused or resumed by changing |
210 * A repeatable event that can be paused or resumed by changing its |
215 * its interestOps. |
211 * interestOps. When the event is fired, it is first paused before being |
216 * When the event is fired, it is first paused before being signaled. |
212 * signaled. It is the responsibility of the code triggered by |
217 * It is the responsibility of the code triggered by {@code signalEvent} |
213 * {@code signalEvent} to resume the event if required. |
218 * to resume the event if required. |
|
219 */ |
214 */ |
220 private static abstract class SocketFlowEvent extends AsyncEvent { |
215 private static abstract class SocketFlowEvent extends AsyncEvent { |
221 final SocketChannel channel; |
216 final SocketChannel channel; |
222 final int defaultInterest; |
217 final int defaultInterest; |
223 volatile int interestOps; |
218 volatile int interestOps; |
257 |
252 |
258 // ===================================================================== // |
253 // ===================================================================== // |
259 // Writing // |
254 // Writing // |
260 // ======================================================================// |
255 // ======================================================================// |
261 |
256 |
262 // This class makes the assumption that the publisher will call |
257 // This class makes the assumption that the publisher will call onNext |
263 // onNext sequentially, and that onNext won't be called if the demand |
258 // sequentially, and that onNext won't be called if the demand has not been |
264 // has not been incremented by request(1). |
259 // incremented by request(1). |
265 // It has a 'queue of 1' meaning that it will call request(1) in |
260 // It has a 'queue of 1' meaning that it will call request(1) in |
266 // onSubscribe, and then only after its 'current' buffer list has been |
261 // onSubscribe, and then only after its 'current' buffer list has been |
267 // fully written and current set to null; |
262 // fully written and current set to null; |
268 private final class InternalWriteSubscriber |
263 private final class InternalWriteSubscriber |
269 implements Flow.Subscriber<List<ByteBuffer>> { |
264 implements Flow.Subscriber<List<ByteBuffer>> { |
310 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write |
305 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write |
311 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent |
306 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent |
312 debugState("leaving w.onNext"); |
307 debugState("leaving w.onNext"); |
313 } |
308 } |
314 |
309 |
315 // we don't use a SequentialScheduler here: we rely on |
310 // Don't use a SequentialScheduler here: rely on onNext() being invoked |
316 // onNext() being called sequentially, and not being called |
311 // sequentially, and not being invoked if there is no demand, request(1). |
317 // if we haven't call request(1) |
312 // onNext is usually called from within a user / executor thread. |
318 // onNext is usually called from within a user/executor thread. |
313 // Initial writing will be performed in that thread. If for some reason, |
319 // we will perform the initial writing in that thread. |
314 // not all the data can be written, a writeEvent will be registered, and |
320 // if for some reason, not all data can be written, the writeEvent |
315 // writing will resume in the the selector manager thread when the |
321 // will be resumed, and the rest of the data will be written from |
316 // writeEvent is fired. |
322 // the selector manager thread when the writeEvent is fired. |
317 // |
323 // If we are in the selector manager thread, then we will use the executor |
318 // If this method is invoked in the selector manager thread (because of |
324 // to call request(1), ensuring that onNext() won't be called from |
319 // a writeEvent), then the executor will be used to invoke request(1), |
325 // within the selector thread. |
320 // ensuring that onNext() won't be invoked from within the selector |
326 // If we are not in the selector manager thread, then we don't care. |
321 // thread. If not in the selector manager thread, then request(1) is |
|
322 // invoked directly. |
327 void tryFlushCurrent(boolean inSelectorThread) { |
323 void tryFlushCurrent(boolean inSelectorThread) { |
328 List<ByteBuffer> bufs = current; |
324 List<ByteBuffer> bufs = current; |
329 if (bufs == null) return; |
325 if (bufs == null) return; |
330 try { |
326 try { |
331 assert inSelectorThread == client.isSelectorThread() : |
327 assert inSelectorThread == client.isSelectorThread() : |
333 + " be in the selector thread"; |
329 + " be in the selector thread"; |
334 long remaining = Utils.remaining(bufs); |
330 long remaining = Utils.remaining(bufs); |
335 debug.log(Level.DEBUG, "trying to write: %d", remaining); |
331 debug.log(Level.DEBUG, "trying to write: %d", remaining); |
336 long written = writeAvailable(bufs); |
332 long written = writeAvailable(bufs); |
337 debug.log(Level.DEBUG, "wrote: %d", written); |
333 debug.log(Level.DEBUG, "wrote: %d", written); |
338 if (written == -1) { |
334 assert written >= 0 : "negative number of bytes written:" + written; |
339 signalError(new EOFException("EOF reached while writing")); |
|
340 return; |
|
341 } |
|
342 assert written <= remaining; |
335 assert written <= remaining; |
343 if (remaining - written == 0) { |
336 if (remaining - written == 0) { |
344 current = null; |
337 current = null; |
345 if (writeDemand.tryDecrement()) { |
338 if (writeDemand.tryDecrement()) { |
346 Runnable requestMore = this::requestMore; |
339 Runnable requestMore = this::requestMore; |
359 signalError(t); |
352 signalError(t); |
360 subscription.cancel(); |
353 subscription.cancel(); |
361 } |
354 } |
362 } |
355 } |
363 |
356 |
364 // Kick off the initial request:1 that will start |
357 // Kick off the initial request:1 that will start the writing side. |
365 // the writing side. Called from the selector manager |
358 // Invoked in the selector manager thread. |
366 // thread. |
|
367 void startSubscription() { |
359 void startSubscription() { |
368 try { |
360 try { |
369 debug.log(Level.DEBUG, "write: starting subscription"); |
361 debug.log(Level.DEBUG, "write: starting subscription"); |
370 assert client.isSelectorThread(); |
362 assert client.isSelectorThread(); |
371 // make sure read registrations are handled before; |
363 // make sure read registrations are handled before; |
971 final long remaining = Utils.remaining(srcs); |
963 final long remaining = Utils.remaining(srcs); |
972 long written = 0; |
964 long written = 0; |
973 while (remaining > written) { |
965 while (remaining > written) { |
974 try { |
966 try { |
975 long w = channel.write(srcs); |
967 long w = channel.write(srcs); |
976 if (w == -1 && written == 0) return -1; |
968 assert w >= 0 : "negative number of bytes written:" + w; |
977 if (w == 0) break; |
969 if (w == 0) { |
|
970 break; |
|
971 } |
978 written += w; |
972 written += w; |
979 } catch (IOException x) { |
973 } catch (IOException x) { |
980 // if no bytes were written just throws... |
974 if (written == 0) { |
981 if (written == 0) throw x; |
975 // no bytes were written just throw |
982 // otherwise return how many bytes were |
976 throw x; |
983 // written: we will fail next time. |
977 } else { |
984 break; |
978 // return how many bytes were written, will fail next time |
|
979 break; |
|
980 } |
985 } |
981 } |
986 } |
982 } |
987 return written; |
983 return written; |
988 } |
984 } |
989 |
985 |