31 import javax.net.ssl.SSLEngineResult; |
31 import javax.net.ssl.SSLEngineResult; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
33 import javax.net.ssl.SSLEngineResult.Status; |
33 import javax.net.ssl.SSLEngineResult.Status; |
34 import javax.net.ssl.SSLException; |
34 import javax.net.ssl.SSLException; |
35 import java.io.IOException; |
35 import java.io.IOException; |
36 import java.lang.System.Logger.Level; |
|
37 import java.nio.ByteBuffer; |
36 import java.nio.ByteBuffer; |
38 import java.util.ArrayList; |
37 import java.util.ArrayList; |
39 import java.util.Collections; |
38 import java.util.Collections; |
40 import java.util.Iterator; |
39 import java.util.Iterator; |
41 import java.util.LinkedList; |
40 import java.util.LinkedList; |
108 public SSLFlowDelegate(SSLEngine engine, |
109 public SSLFlowDelegate(SSLEngine engine, |
109 Executor exec, |
110 Executor exec, |
110 Subscriber<? super List<ByteBuffer>> downReader, |
111 Subscriber<? super List<ByteBuffer>> downReader, |
111 Subscriber<? super List<ByteBuffer>> downWriter) |
112 Subscriber<? super List<ByteBuffer>> downWriter) |
112 { |
113 { |
|
114 this(engine, exec, null, downReader, downWriter); |
|
115 } |
|
116 |
|
117 /** |
|
118 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
|
119 * Flow.Subscriber requires an associated {@link CompletableFuture} |
|
120 * for errors that need to be signaled from downstream to upstream. |
|
121 */ |
|
122 public SSLFlowDelegate(SSLEngine engine, |
|
123 Executor exec, |
|
124 Consumer<ByteBuffer> recycler, |
|
125 Subscriber<? super List<ByteBuffer>> downReader, |
|
126 Subscriber<? super List<ByteBuffer>> downWriter) |
|
127 { |
113 this.id = scount.getAndIncrement(); |
128 this.id = scount.getAndIncrement(); |
114 this.tubeName = String.valueOf(downWriter); |
129 this.tubeName = String.valueOf(downWriter); |
|
130 this.recycler = recycler; |
115 this.reader = new Reader(); |
131 this.reader = new Reader(); |
116 this.writer = new Writer(); |
132 this.writer = new Writer(); |
117 this.engine = engine; |
133 this.engine = engine; |
118 this.exec = exec; |
134 this.exec = exec; |
119 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
135 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
179 public String monitor() { |
195 public String monitor() { |
180 StringBuilder sb = new StringBuilder(); |
196 StringBuilder sb = new StringBuilder(); |
181 sb.append("SSL: id ").append(id); |
197 sb.append("SSL: id ").append(id); |
182 sb.append(" HS state: " + states(handshakeState)); |
198 sb.append(" HS state: " + states(handshakeState)); |
183 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
199 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
184 sb.append(" LL : "); |
200 if (stateList != null) { |
185 for (String s: stateList) { |
201 sb.append(" LL : "); |
186 sb.append(s).append(" "); |
202 for (String s : stateList) { |
|
203 sb.append(s).append(" "); |
|
204 } |
187 } |
205 } |
188 sb.append("\r\n"); |
206 sb.append("\r\n"); |
189 sb.append("Reader:: ").append(reader.toString()); |
207 sb.append("Reader:: ").append(reader.toString()); |
190 sb.append("\r\n"); |
208 sb.append("\r\n"); |
191 sb.append("Writer:: ").append(writer.toString()); |
209 sb.append("Writer:: ").append(writer.toString()); |
211 * OK: return generated buffers. |
229 * OK: return generated buffers. |
212 * |
230 * |
213 * Upstream subscription strategy is to try and keep no more than |
231 * Upstream subscription strategy is to try and keep no more than |
214 * TARGET_BUFSIZE bytes in readBuf |
232 * TARGET_BUFSIZE bytes in readBuf |
215 */ |
233 */ |
216 class Reader extends SubscriberWrapper { |
234 final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { |
|
235 // Maximum record size is 16k. |
|
236 // Because SocketTube can feeds us up to 3 16K buffers, |
|
237 // then setting this size to 16K means that the readBuf |
|
238 // can store up to 64K-1 (16K-1 + 3*16K) |
|
239 static final int TARGET_BUFSIZE = 16 * 1024; |
|
240 |
217 final SequentialScheduler scheduler; |
241 final SequentialScheduler scheduler; |
218 static final int TARGET_BUFSIZE = 16 * 1024; |
|
219 volatile ByteBuffer readBuf; |
242 volatile ByteBuffer readBuf; |
220 volatile boolean completing; |
243 volatile boolean completing; |
221 final Object readBufferLock = new Object(); |
244 final Object readBufferLock = new Object(); |
222 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
245 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
223 |
246 |
224 class ReaderDownstreamPusher implements Runnable { |
247 private final class ReaderDownstreamPusher implements Runnable { |
225 @Override public void run() { processData(); } |
248 @Override public void run() { processData(); } |
226 } |
249 } |
227 |
250 |
228 Reader() { |
251 Reader() { |
229 super(); |
252 super(); |
230 scheduler = SequentialScheduler.synchronizedScheduler( |
253 scheduler = SequentialScheduler.synchronizedScheduler( |
231 new ReaderDownstreamPusher()); |
254 new ReaderDownstreamPusher()); |
232 this.readBuf = ByteBuffer.allocate(1024); |
255 this.readBuf = ByteBuffer.allocate(1024); |
233 readBuf.limit(0); // keep in read mode |
256 readBuf.limit(0); // keep in read mode |
|
257 } |
|
258 |
|
259 @Override |
|
260 public boolean supportsRecycling() { |
|
261 return recycler != null; |
234 } |
262 } |
235 |
263 |
236 protected SchedulingAction enterScheduling() { |
264 protected SchedulingAction enterScheduling() { |
237 return enterReadScheduling(); |
265 return enterReadScheduling(); |
238 } |
266 } |
283 readBuf.compact(); |
314 readBuf.compact(); |
284 while (readBuf.remaining() < buf.remaining()) |
315 while (readBuf.remaining() < buf.remaining()) |
285 reallocReadBuf(); |
316 reallocReadBuf(); |
286 readBuf.put(buf); |
317 readBuf.put(buf); |
287 readBuf.flip(); |
318 readBuf.flip(); |
|
319 // should be safe to call inside lock |
|
320 // since the only implementation |
|
321 // offers the buffer to an unbounded queue. |
|
322 // WARNING: do not touch buf after this point! |
|
323 if (recycler != null) recycler.accept(buf); |
288 } |
324 } |
289 if (complete) { |
325 if (complete) { |
290 this.completing = complete; |
326 this.completing = complete; |
291 } |
327 } |
292 } |
328 } |
293 } |
329 } |
294 |
330 |
295 void schedule() { |
331 void schedule() { |
296 scheduler.runOrSchedule(); |
332 scheduler.runOrSchedule(exec); |
297 } |
333 } |
298 |
334 |
299 void stop() { |
335 void stop() { |
300 if (debugr.on()) debugr.log("stop"); |
336 if (debugr.on()) debugr.log("stop"); |
301 scheduler.stop(); |
337 scheduler.stop(); |
302 } |
338 } |
303 |
339 |
304 AtomicInteger count = new AtomicInteger(0); |
340 AtomicInteger count = new AtomicInteger(0); |
305 |
341 |
|
342 // minimum number of bytes required to call unwrap. |
|
343 // Usually this is 0, unless there was a buffer underflow. |
|
344 // In this case we need to wait for more bytes than what |
|
345 // we had before calling unwrap() again. |
|
346 volatile int minBytesRequired; |
306 // work function where it all happens |
347 // work function where it all happens |
307 void processData() { |
348 final void processData() { |
308 try { |
349 try { |
309 if (debugr.on()) |
350 if (debugr.on()) |
310 debugr.log("processData:" |
351 debugr.log("processData:" |
311 + " readBuf remaining:" + readBuf.remaining() |
352 + " readBuf remaining:" + readBuf.remaining() |
312 + ", state:" + states(handshakeState) |
353 + ", state:" + states(handshakeState) |
313 + ", engine handshake status:" + engine.getHandshakeStatus()); |
354 + ", engine handshake status:" + engine.getHandshakeStatus()); |
314 int len; |
355 int len; |
315 boolean complete = false; |
356 boolean complete = false; |
316 while ((len = readBuf.remaining()) > 0) { |
357 while (readBuf.remaining() > (len = minBytesRequired)) { |
317 boolean handshaking = false; |
358 boolean handshaking = false; |
318 try { |
359 try { |
319 EngineResult result; |
360 EngineResult result; |
320 synchronized (readBufferLock) { |
361 synchronized (readBufferLock) { |
321 complete = this.completing; |
362 complete = this.completing; |
|
363 if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); |
|
364 // Unless there is a BUFFER_UNDERFLOW, we should try to |
|
365 // unwrap any number of bytes. Set minBytesRequired to 0: |
|
366 // we only need to do that if minBytesRequired is not already 0. |
|
367 len = len > 0 ? minBytesRequired = 0 : len; |
322 result = unwrapBuffer(readBuf); |
368 result = unwrapBuffer(readBuf); |
323 if (debugr.on()) |
369 len = readBuf.remaining(); |
324 debugr.log("Unwrapped: %s", result.result); |
370 if (debugr.on()) { |
|
371 debugr.log("Unwrapped: result: %s", result.result); |
|
372 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); |
|
373 } |
325 } |
374 } |
326 if (result.bytesProduced() > 0) { |
375 if (result.bytesProduced() > 0) { |
327 if (debugr.on()) |
376 if (debugr.on()) |
328 debugr.log("sending %d", result.bytesProduced()); |
377 debugr.log("sending %d", result.bytesProduced()); |
329 count.addAndGet(result.bytesProduced()); |
378 count.addAndGet(result.bytesProduced()); |
330 outgoing(result.destBuffer, false); |
379 outgoing(result.destBuffer, false); |
331 } |
380 } |
332 if (result.status() == Status.BUFFER_UNDERFLOW) { |
381 if (result.status() == Status.BUFFER_UNDERFLOW) { |
333 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); |
382 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); |
334 // not enough data in the read buffer... |
383 // not enough data in the read buffer... |
|
384 // no need to try to unwrap again unless we get more bytes |
|
385 // than minBytesRequired = len in the read buffer. |
|
386 minBytesRequired = len; |
|
387 synchronized (readBufferLock) { |
|
388 // more bytes could already have been added... |
|
389 assert readBuf.remaining() >= len; |
|
390 // check if we have received some data, and if so |
|
391 // we can just re-spin the loop |
|
392 if (readBuf.remaining() > len) continue; |
|
393 } |
|
394 // request more data and return. |
335 requestMore(); |
395 requestMore(); |
336 synchronized (readBufferLock) { |
396 return; |
337 // check if we have received some data |
|
338 if (readBuf.remaining() > len) continue; |
|
339 return; |
|
340 } |
|
341 } |
397 } |
342 if (complete && result.status() == Status.CLOSED) { |
398 if (complete && result.status() == Status.CLOSED) { |
343 if (debugr.on()) debugr.log("Closed: completing"); |
399 if (debugr.on()) debugr.log("Closed: completing"); |
344 outgoing(Utils.EMPTY_BB_LIST, true); |
400 outgoing(Utils.EMPTY_BB_LIST, true); |
345 return; |
401 return; |
757 reader.schedule(); |
828 reader.schedule(); |
758 writer.schedule(); |
829 writer.schedule(); |
759 } |
830 } |
760 |
831 |
761 final AtomicInteger handshakeState; |
832 final AtomicInteger handshakeState; |
762 final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>(); |
833 final ConcurrentLinkedQueue<String> stateList = |
|
834 debug.on() ? new ConcurrentLinkedQueue<>() : null; |
763 |
835 |
764 private boolean doHandshake(EngineResult r, int caller) { |
836 private boolean doHandshake(EngineResult r, int caller) { |
765 // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS |
837 // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS |
766 handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); |
838 handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); |
767 stateList.add(r.handshakeStatus().toString()); |
839 if (stateList != null && debug.on()) { |
768 stateList.add(Integer.toString(caller)); |
840 stateList.add(r.handshakeStatus().toString()); |
|
841 stateList.add(Integer.toString(caller)); |
|
842 } |
769 switch (r.handshakeStatus()) { |
843 switch (r.handshakeStatus()) { |
770 case NEED_TASK: |
844 case NEED_TASK: |
771 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); |
845 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); |
772 if ((s & DOING_TASKS) > 0) // someone else was doing tasks |
846 if ((s & DOING_TASKS) > 0) // someone else was doing tasks |
773 return false; |
847 return false; |
837 // an acknowledgement back. We're calling doHandshake |
910 // an acknowledgement back. We're calling doHandshake |
838 // to finish the close handshake. |
911 // to finish the close handshake. |
839 if (engine.isInboundDone() && !engine.isOutboundDone()) { |
912 if (engine.isInboundDone() && !engine.isOutboundDone()) { |
840 if (debug.on()) debug.log("doClosure: close_notify received"); |
913 if (debug.on()) debug.log("doClosure: close_notify received"); |
841 close_notify_received = true; |
914 close_notify_received = true; |
842 doHandshake(r, READER); |
915 if (!writer.scheduler.isStopped()) { |
|
916 doHandshake(r, READER); |
|
917 } else { |
|
918 // We have received closed notify, but we |
|
919 // won't be able to send the acknowledgement. |
|
920 // Nothing more will come from the socket either, |
|
921 // so mark the reader as completed. |
|
922 synchronized (reader.readBufferLock) { |
|
923 reader.completing = true; |
|
924 } |
|
925 } |
843 } |
926 } |
844 } |
927 } |
845 return r; |
928 return r; |
846 } |
929 } |
847 |
930 |
912 SSLEngineResult.Status status() { |
995 SSLEngineResult.Status status() { |
913 return result.getStatus(); |
996 return result.getStatus(); |
914 } |
997 } |
915 } |
998 } |
916 |
999 |
917 public ByteBuffer getNetBuffer() { |
1000 volatile int packetBufferSize; |
918 return ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); |
1001 final ByteBuffer getNetBuffer() { |
919 } |
1002 int netSize = packetBufferSize; |
920 |
1003 if (netSize <= 0) { |
921 private ByteBuffer getAppBuffer() { |
1004 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); |
922 return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); |
1005 } |
|
1006 return ByteBuffer.allocate(netSize); |
|
1007 } |
|
1008 |
|
1009 volatile int applicationBufferSize; |
|
1010 final ByteBuffer getAppBuffer() { |
|
1011 int appSize = applicationBufferSize; |
|
1012 if (appSize <= 0) { |
|
1013 applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize(); |
|
1014 } |
|
1015 return ByteBuffer.allocate(appSize); |
923 } |
1016 } |
924 |
1017 |
925 final String dbgString() { |
1018 final String dbgString() { |
926 return "SSLFlowDelegate(" + tubeName + ")"; |
1019 return "SSLFlowDelegate(" + tubeName + ")"; |
927 } |
1020 } |