85 public class SSLFlowDelegate { |
86 public class SSLFlowDelegate { |
86 |
87 |
87 final Logger debug = |
88 final Logger debug = |
88 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
89 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
89 |
90 |
|
91 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
|
92 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
|
93 // When handshake is in progress trying to wrap may produce no bytes. |
|
94 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); |
|
95 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); |
|
96 |
90 final Executor exec; |
97 final Executor exec; |
91 final Reader reader; |
98 final Reader reader; |
92 final Writer writer; |
99 final Writer writer; |
93 final SSLEngine engine; |
100 final SSLEngine engine; |
94 final String tubeName; // hack |
101 final String tubeName; // hack |
95 final CompletableFuture<String> alpnCF; // completes on initial handshake |
102 final CompletableFuture<String> alpnCF; // completes on initial handshake |
96 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
|
97 volatile boolean close_notify_received; |
103 volatile boolean close_notify_received; |
98 final CompletableFuture<Void> readerCF; |
104 final CompletableFuture<Void> readerCF; |
99 final CompletableFuture<Void> writerCF; |
105 final CompletableFuture<Void> writerCF; |
100 final Consumer<ByteBuffer> recycler; |
106 final Consumer<ByteBuffer> recycler; |
101 static AtomicInteger scount = new AtomicInteger(1); |
107 static AtomicInteger scount = new AtomicInteger(1); |
243 volatile boolean completing; |
250 volatile boolean completing; |
244 final Object readBufferLock = new Object(); |
251 final Object readBufferLock = new Object(); |
245 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
252 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
246 |
253 |
247 private final class ReaderDownstreamPusher implements Runnable { |
254 private final class ReaderDownstreamPusher implements Runnable { |
248 @Override public void run() { processData(); } |
255 @Override |
|
256 public void run() { |
|
257 processData(); |
|
258 } |
249 } |
259 } |
250 |
260 |
251 Reader() { |
261 Reader() { |
252 super(); |
262 super(); |
253 scheduler = SequentialScheduler.synchronizedScheduler( |
263 scheduler = SequentialScheduler.synchronizedScheduler( |
254 new ReaderDownstreamPusher()); |
264 new ReaderDownstreamPusher()); |
255 this.readBuf = ByteBuffer.allocate(1024); |
265 this.readBuf = ByteBuffer.allocate(1024); |
256 readBuf.limit(0); // keep in read mode |
266 readBuf.limit(0); // keep in read mode |
257 } |
267 } |
258 |
268 |
259 @Override |
269 @Override |
287 + " count: " + count.toString(); |
297 + " count: " + count.toString(); |
288 } |
298 } |
289 |
299 |
290 private void reallocReadBuf() { |
300 private void reallocReadBuf() { |
291 int sz = readBuf.capacity(); |
301 int sz = readBuf.capacity(); |
292 ByteBuffer newb = ByteBuffer.allocate(sz*2); |
302 ByteBuffer newb = ByteBuffer.allocate(sz * 2); |
293 readBuf.flip(); |
303 readBuf.flip(); |
294 Utils.copy(readBuf, newb); |
304 Utils.copy(readBuf, newb); |
295 readBuf = newb; |
305 readBuf = newb; |
296 } |
306 } |
297 |
307 |
298 @Override |
308 @Override |
299 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { |
309 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { |
300 if (readBuf.remaining() > TARGET_BUFSIZE) { |
310 if (readBuf.remaining() > TARGET_BUFSIZE) { |
301 if (debugr.on()) |
311 if (debugr.on()) |
302 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", |
312 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", |
303 readBuf.remaining()); |
313 readBuf.remaining()); |
304 return 0; |
314 return 0; |
305 } else { |
315 } else { |
306 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); |
316 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); |
307 } |
317 } |
308 } |
318 } |
309 |
319 |
310 // readBuf is kept ready for reading outside of this method |
320 // readBuf is kept ready for reading outside of this method |
311 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { |
321 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { |
|
322 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); |
312 synchronized (readBufferLock) { |
323 synchronized (readBufferLock) { |
313 for (ByteBuffer buf : buffers) { |
324 for (ByteBuffer buf : buffers) { |
314 readBuf.compact(); |
325 readBuf.compact(); |
315 while (readBuf.remaining() < buf.remaining()) |
326 while (readBuf.remaining() < buf.remaining()) |
316 reallocReadBuf(); |
327 reallocReadBuf(); |
342 // minimum number of bytes required to call unwrap. |
353 // minimum number of bytes required to call unwrap. |
343 // Usually this is 0, unless there was a buffer underflow. |
354 // Usually this is 0, unless there was a buffer underflow. |
344 // In this case we need to wait for more bytes than what |
355 // In this case we need to wait for more bytes than what |
345 // we had before calling unwrap() again. |
356 // we had before calling unwrap() again. |
346 volatile int minBytesRequired; |
357 volatile int minBytesRequired; |
|
358 |
347 // work function where it all happens |
359 // work function where it all happens |
348 final void processData() { |
360 final void processData() { |
349 try { |
361 try { |
350 if (debugr.on()) |
362 if (debugr.on()) |
351 debugr.log("processData:" |
363 debugr.log("processData:" |
352 + " readBuf remaining:" + readBuf.remaining() |
364 + " readBuf remaining:" + readBuf.remaining() |
353 + ", state:" + states(handshakeState) |
365 + ", state:" + states(handshakeState) |
354 + ", engine handshake status:" + engine.getHandshakeStatus()); |
366 + ", engine handshake status:" + engine.getHandshakeStatus()); |
355 int len; |
367 int len; |
356 boolean complete = false; |
368 boolean complete = false; |
357 while (readBuf.remaining() > (len = minBytesRequired)) { |
369 while (readBuf.remaining() > (len = minBytesRequired)) { |
358 boolean handshaking = false; |
370 boolean handshaking = false; |
359 try { |
371 try { |
398 if (complete && result.status() == Status.CLOSED) { |
410 if (complete && result.status() == Status.CLOSED) { |
399 if (debugr.on()) debugr.log("Closed: completing"); |
411 if (debugr.on()) debugr.log("Closed: completing"); |
400 outgoing(Utils.EMPTY_BB_LIST, true); |
412 outgoing(Utils.EMPTY_BB_LIST, true); |
401 return; |
413 return; |
402 } |
414 } |
403 if (result.handshaking() && !complete) { |
415 if (result.handshaking()) { |
|
416 handshaking = true; |
404 if (debugr.on()) debugr.log("handshaking"); |
417 if (debugr.on()) debugr.log("handshaking"); |
405 if (doHandshake(result, READER)) { |
418 if (doHandshake(result, READER)) continue; // need unwrap |
406 resumeActivity(); |
419 else break; // doHandshake will have triggered the write scheduler if necessary |
407 } |
|
408 handshaking = true; |
|
409 } else { |
420 } else { |
410 if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) { |
421 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { |
411 handshaking = false; |
422 handshaking = false; |
412 applicationBufferSize = engine.getSession().getApplicationBufferSize(); |
423 applicationBufferSize = engine.getSession().getApplicationBufferSize(); |
413 packetBufferSize = engine.getSession().getPacketBufferSize(); |
424 packetBufferSize = engine.getSession().getPacketBufferSize(); |
414 setALPN(); |
425 setALPN(); |
415 resumeActivity(); |
426 resumeActivity(); |
441 } |
452 } |
442 } |
453 } |
443 |
454 |
444 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { |
455 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { |
445 ByteBuffer dst = getAppBuffer(); |
456 ByteBuffer dst = getAppBuffer(); |
|
457 int len = src.remaining(); |
446 while (true) { |
458 while (true) { |
447 SSLEngineResult sslResult = engine.unwrap(src, dst); |
459 SSLEngineResult sslResult = engine.unwrap(src, dst); |
448 switch (sslResult.getStatus()) { |
460 switch (sslResult.getStatus()) { |
449 case BUFFER_OVERFLOW: |
461 case BUFFER_OVERFLOW: |
450 // may happen only if app size buffer was changed. |
462 // may happen if app size buffer was changed, or if |
451 // get it again if app buffer size changed |
463 // our 'adaptiveBufferSize' guess was too small for |
|
464 // the current payload. In that case, update the |
|
465 // value of applicationBufferSize, and allocate a |
|
466 // buffer of that size, which we are sure will be |
|
467 // big enough to decode whatever needs to be |
|
468 // decoded. We will later update adaptiveBufferSize |
|
469 // in OK: below. |
452 int appSize = applicationBufferSize = |
470 int appSize = applicationBufferSize = |
453 engine.getSession().getApplicationBufferSize(); |
471 engine.getSession().getApplicationBufferSize(); |
454 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); |
472 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); |
455 dst.flip(); |
473 dst.flip(); |
456 b.put(dst); |
474 b.put(dst); |
457 dst = b; |
475 dst = b; |
458 break; |
476 break; |
459 case CLOSED: |
477 case CLOSED: |
|
478 assert dst.position() == 0; |
460 return doClosure(new EngineResult(sslResult)); |
479 return doClosure(new EngineResult(sslResult)); |
461 case BUFFER_UNDERFLOW: |
480 case BUFFER_UNDERFLOW: |
462 // handled implicitly by compaction/reallocation of readBuf |
481 // handled implicitly by compaction/reallocation of readBuf |
|
482 assert dst.position() == 0; |
463 return new EngineResult(sslResult); |
483 return new EngineResult(sslResult); |
464 case OK: |
484 case OK: |
|
485 int size = dst.position(); |
|
486 if (debug.on()) { |
|
487 debugr.log("Decoded " + size + " bytes out of " + len |
|
488 + " into buffer of " + dst.capacity() |
|
489 + " remaining to decode: " + src.remaining()); |
|
490 } |
|
491 // if the record payload was bigger than what was originally |
|
492 // allocated, then sets the adaptiveAppBufferSize to size |
|
493 // and we will use that new size as a guess for the next app |
|
494 // buffer. |
|
495 if (size > adaptiveAppBufferSize) { |
|
496 adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; |
|
497 } |
465 dst.flip(); |
498 dst.flip(); |
466 return new EngineResult(sslResult, dst); |
499 return new EngineResult(sslResult, dst); |
467 } |
500 } |
468 } |
501 } |
469 } |
502 } |
685 errorCommon(ex); |
718 errorCommon(ex); |
686 handleError(ex); |
719 handleError(ex); |
687 } |
720 } |
688 } |
721 } |
689 |
722 |
|
723 // The SSLEngine insists on being given a buffer that is at least |
|
724 // SSLSession.getPacketBufferSize() long (usually 16K). If given |
|
725 // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only |
|
726 // has 6 bytes to wrap. Typical usage shows that for GET we |
|
727 // usually produce an average of ~ 100 bytes. |
|
728 // To avoid wasting space, and because allocating and zeroing |
|
729 // 16K buffers for encoding 6 bytes is costly, we are reusing the |
|
730 // same writeBuffer to interact with SSLEngine.wrap(). |
|
731 // If the SSLEngine produces less than writeBuffer.capacity() / 2, |
|
732 // then we copy off the bytes to a smaller buffer that we send |
|
733 // downstream. Otherwise, we send the writeBuffer downstream |
|
734 // and will allocate a new one next time. |
|
735 volatile ByteBuffer writeBuffer; |
690 @SuppressWarnings("fallthrough") |
736 @SuppressWarnings("fallthrough") |
691 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { |
737 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { |
|
738 long len = Utils.remaining(src); |
692 if (debugw.on()) |
739 if (debugw.on()) |
693 debugw.log("wrapping " + Utils.remaining(src) + " bytes"); |
740 debugw.log("wrapping " + len + " bytes"); |
694 ByteBuffer dst = getNetBuffer(); |
741 |
|
742 ByteBuffer dst = writeBuffer; |
|
743 if (dst == null) dst = writeBuffer = getNetBuffer(); |
|
744 assert dst.position() == 0 : "buffer position is " + dst.position(); |
|
745 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); |
|
746 |
695 while (true) { |
747 while (true) { |
696 SSLEngineResult sslResult = engine.wrap(src, dst); |
748 SSLEngineResult sslResult = engine.wrap(src, dst); |
697 if (debugw.on()) debugw.log("SSLResult: " + sslResult); |
749 if (debugw.on()) debugw.log("SSLResult: " + sslResult); |
698 switch (sslResult.getStatus()) { |
750 switch (sslResult.getStatus()) { |
699 case BUFFER_OVERFLOW: |
751 case BUFFER_OVERFLOW: |
700 // Shouldn't happen. We allocated buffer with packet size |
752 // Shouldn't happen. We allocated buffer with packet size |
701 // get it again if net buffer size was changed |
753 // get it again if net buffer size was changed |
702 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); |
754 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); |
703 int netSize = packetBufferSize |
755 int netSize = packetBufferSize |
704 = engine.getSession().getPacketBufferSize(); |
756 = engine.getSession().getPacketBufferSize(); |
705 ByteBuffer b = ByteBuffer.allocate(netSize + dst.position()); |
757 ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); |
706 dst.flip(); |
758 dst.flip(); |
707 b.put(dst); |
759 b.put(dst); |
708 dst = b; |
760 dst = b; |
709 break; // try again |
761 break; // try again |
710 case CLOSED: |
762 case CLOSED: |
711 if (debugw.on()) debugw.log("CLOSED"); |
763 if (debugw.on()) debugw.log("CLOSED"); |
712 // fallthrough. There could be some remaining data in dst. |
764 // fallthrough. There could be some remaining data in dst. |
713 // CLOSED will be handled by the caller. |
765 // CLOSED will be handled by the caller. |
714 case OK: |
766 case OK: |
715 dst.flip(); |
767 final ByteBuffer dest; |
716 final ByteBuffer dest = dst; |
768 if (dst.position() == 0) { |
|
769 dest = NOTHING; // can happen if handshake is in progress |
|
770 } else if (dst.position() < dst.capacity() / 2) { |
|
771 // less than half the buffer was used. |
|
772 // copy off the bytes to a smaller buffer, and keep |
|
773 // the writeBuffer for next time. |
|
774 dst.flip(); |
|
775 dest = Utils.copyAligned(dst); |
|
776 dst.clear(); |
|
777 } else { |
|
778 // more than half the buffer was used. |
|
779 // just send that buffer downstream, and we will |
|
780 // get a new writeBuffer next time it is needed. |
|
781 dst.flip(); |
|
782 dest = dst; |
|
783 writeBuffer = null; |
|
784 } |
717 if (debugw.on()) |
785 if (debugw.on()) |
718 debugw.log("OK => produced: %d, not wrapped: %d", |
786 debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", |
719 dest.remaining(), Utils.remaining(src)); |
787 dest.remaining(), dest.capacity(), Utils.remaining(src)); |
720 return new EngineResult(sslResult, dest); |
788 return new EngineResult(sslResult, dest); |
721 case BUFFER_UNDERFLOW: |
789 case BUFFER_UNDERFLOW: |
722 // Shouldn't happen. Doesn't returns when wrap() |
790 // Shouldn't happen. Doesn't returns when wrap() |
723 // underflow handled externally |
791 // underflow handled externally |
724 // assert false : "Buffer Underflow"; |
792 // assert false : "Buffer Underflow"; |
797 * and write() functions. |
865 * and write() functions. |
798 */ |
866 */ |
799 private static final int NOT_HANDSHAKING = 0; |
867 private static final int NOT_HANDSHAKING = 0; |
800 private static final int HANDSHAKING = 1; |
868 private static final int HANDSHAKING = 1; |
801 |
869 |
802 private static final int DOING_TASKS = 4; // bit added to above state |
870 // Bit flags |
803 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
871 // a thread is currently executing tasks |
|
872 private static final int DOING_TASKS = 4; |
|
873 // a thread wants to execute tasks, while another thread is executing |
|
874 private static final int REQUESTING_TASKS = 8; |
|
875 private static final int TASK_BITS = 12; // Both bits |
804 |
876 |
805 private static final int READER = 1; |
877 private static final int READER = 1; |
806 private static final int WRITER = 2; |
878 private static final int WRITER = 2; |
807 |
879 |
808 private static String states(AtomicInteger state) { |
880 private static String states(AtomicInteger state) { |
809 int s = state.get(); |
881 int s = state.get(); |
810 StringBuilder sb = new StringBuilder(); |
882 StringBuilder sb = new StringBuilder(); |
811 int x = s & ~DOING_TASKS; |
883 int x = s & ~TASK_BITS; |
812 switch (x) { |
884 switch (x) { |
813 case NOT_HANDSHAKING: |
885 case NOT_HANDSHAKING: |
814 sb.append(" NOT_HANDSHAKING "); |
886 sb.append(" NOT_HANDSHAKING "); |
815 break; |
887 break; |
816 case HANDSHAKING: |
888 case HANDSHAKING: |
831 |
905 |
832 final AtomicInteger handshakeState; |
906 final AtomicInteger handshakeState; |
833 final ConcurrentLinkedQueue<String> stateList = |
907 final ConcurrentLinkedQueue<String> stateList = |
834 debug.on() ? new ConcurrentLinkedQueue<>() : null; |
908 debug.on() ? new ConcurrentLinkedQueue<>() : null; |
835 |
909 |
|
910 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS |
|
911 // depending on previous value |
|
912 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { |
|
913 if ((current & DOING_TASKS) == 0) |
|
914 return DOING_TASKS | (current & HANDSHAKING); |
|
915 else |
|
916 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); |
|
917 }; |
|
918 |
|
919 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set |
|
920 // clears bits if not. |
|
921 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { |
|
922 if ((current & REQUESTING_TASKS) != 0) |
|
923 return DOING_TASKS | (current & HANDSHAKING); |
|
924 // clear both bits |
|
925 return (current & HANDSHAKING); |
|
926 }; |
|
927 |
836 private boolean doHandshake(EngineResult r, int caller) { |
928 private boolean doHandshake(EngineResult r, int caller) { |
837 // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS |
929 // unconditionally sets the HANDSHAKING bit, while preserving task bits |
838 handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); |
930 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); |
839 if (stateList != null && debug.on()) { |
931 if (stateList != null && debug.on()) { |
840 stateList.add(r.handshakeStatus().toString()); |
932 stateList.add(r.handshakeStatus().toString()); |
841 stateList.add(Integer.toString(caller)); |
933 stateList.add(Integer.toString(caller)); |
842 } |
934 } |
843 switch (r.handshakeStatus()) { |
935 switch (r.handshakeStatus()) { |
844 case NEED_TASK: |
936 case NEED_TASK: |
845 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); |
937 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); |
846 if ((s & DOING_TASKS) > 0) // someone else was doing tasks |
938 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks |
847 return false; |
939 return false; |
|
940 } |
848 |
941 |
849 if (debug.on()) debug.log("obtaining and initiating task execution"); |
942 if (debug.on()) debug.log("obtaining and initiating task execution"); |
850 List<Runnable> tasks = obtainTasks(); |
943 List<Runnable> tasks = obtainTasks(); |
851 executeTasks(tasks); |
944 executeTasks(tasks); |
852 return false; // executeTasks will resume activity |
945 return false; // executeTasks will resume activity |
876 } |
969 } |
877 return l; |
970 return l; |
878 } |
971 } |
879 |
972 |
880 private void executeTasks(List<Runnable> tasks) { |
973 private void executeTasks(List<Runnable> tasks) { |
881 if (tasks.isEmpty()) |
|
882 return; |
|
883 exec.execute(() -> { |
974 exec.execute(() -> { |
884 try { |
975 try { |
885 List<Runnable> nextTasks = tasks; |
976 List<Runnable> nextTasks = tasks; |
|
977 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); |
886 do { |
978 do { |
887 nextTasks.forEach(Runnable::run); |
979 nextTasks.forEach(Runnable::run); |
888 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { |
980 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { |
889 nextTasks = obtainTasks(); |
981 nextTasks = obtainTasks(); |
890 } else { |
982 } else { |
|
983 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); |
|
984 if ((s & DOING_TASKS) != 0) { |
|
985 if (debug.on()) debug.log("re-running tasks (B)"); |
|
986 nextTasks = obtainTasks(); |
|
987 continue; |
|
988 } |
891 break; |
989 break; |
892 } |
990 } |
893 } while (true); |
991 } while (true); |
894 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); |
992 if (debug.on()) debug.log("finished task execution"); |
895 resumeActivity(); |
993 resumeActivity(); |
896 } catch (Throwable t) { |
994 } catch (Throwable t) { |
897 handleError(t); |
995 handleError(t); |
898 } |
996 } |
899 }); |
997 }); |
995 SSLEngineResult.Status status() { |
1093 SSLEngineResult.Status status() { |
996 return result.getStatus(); |
1094 return result.getStatus(); |
997 } |
1095 } |
998 } |
1096 } |
999 |
1097 |
|
1098 // The maximum network buffer size negotiated during |
|
1099 // the handshake. Usually 16K. |
1000 volatile int packetBufferSize; |
1100 volatile int packetBufferSize; |
1001 final ByteBuffer getNetBuffer() { |
1101 final ByteBuffer getNetBuffer() { |
1002 int netSize = packetBufferSize; |
1102 int netSize = packetBufferSize; |
1003 if (netSize <= 0) { |
1103 if (netSize <= 0) { |
1004 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); |
1104 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); |
1005 } |
1105 } |
1006 return ByteBuffer.allocate(netSize); |
1106 return ByteBuffer.allocate(netSize); |
1007 } |
1107 } |
1008 |
1108 |
|
1109 // The maximum application buffer size negotiated during |
|
1110 // the handshake. Usually close to 16K. |
1009 volatile int applicationBufferSize; |
1111 volatile int applicationBufferSize; |
|
1112 // Despite of the maximum applicationBufferSize negotiated |
|
1113 // above, TLS records usually have a much smaller payload. |
|
1114 // The adaptativeAppBufferSize records the max payload |
|
1115 // ever decoded, and we use that as a guess for how big |
|
1116 // a buffer we will need for the next payload. |
|
1117 // This avoids allocating and zeroing a 16K buffer for |
|
1118 // nothing... |
|
1119 volatile int adaptiveAppBufferSize; |
1010 final ByteBuffer getAppBuffer() { |
1120 final ByteBuffer getAppBuffer() { |
1011 int appSize = applicationBufferSize; |
1121 int appSize = applicationBufferSize; |
1012 if (appSize <= 0) { |
1122 if (appSize <= 0) { |
1013 applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize(); |
1123 applicationBufferSize = appSize |
1014 } |
1124 = engine.getSession().getApplicationBufferSize(); |
1015 return ByteBuffer.allocate(appSize); |
1125 } |
|
1126 int size = adaptiveAppBufferSize; |
|
1127 if (size <= 0) { |
|
1128 size = 512; // start with 512 this is usually enough for handshaking / headers |
|
1129 } else if (size > appSize) { |
|
1130 size = appSize; |
|
1131 } |
|
1132 // will cause a BUFFER_OVERFLOW if not big enough, but |
|
1133 // that's OK. |
|
1134 return ByteBuffer.allocate(size); |
1016 } |
1135 } |
1017 |
1136 |
1018 final String dbgString() { |
1137 final String dbgString() { |
1019 return "SSLFlowDelegate(" + tubeName + ")"; |
1138 return "SSLFlowDelegate(" + tubeName + ")"; |
1020 } |
1139 } |