31 import javax.net.ssl.SSLEngineResult; |
31 import javax.net.ssl.SSLEngineResult; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
33 import javax.net.ssl.SSLEngineResult.Status; |
33 import javax.net.ssl.SSLEngineResult.Status; |
34 import javax.net.ssl.SSLException; |
34 import javax.net.ssl.SSLException; |
35 import java.io.IOException; |
35 import java.io.IOException; |
|
36 import java.lang.ref.Reference; |
|
37 import java.lang.ref.ReferenceQueue; |
|
38 import java.lang.ref.WeakReference; |
36 import java.nio.ByteBuffer; |
39 import java.nio.ByteBuffer; |
37 import java.util.ArrayList; |
40 import java.util.ArrayList; |
38 import java.util.Collections; |
41 import java.util.Collections; |
39 import java.util.Iterator; |
42 import java.util.Iterator; |
40 import java.util.LinkedList; |
43 import java.util.LinkedList; |
91 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
94 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
92 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
95 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
93 // When handshake is in progress trying to wrap may produce no bytes. |
96 // When handshake is in progress trying to wrap may produce no bytes. |
94 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); |
97 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); |
95 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); |
98 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); |
|
99 private static final boolean isMonitored = |
|
100 monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")); |
96 |
101 |
97 final Executor exec; |
102 final Executor exec; |
98 final Reader reader; |
103 final Reader reader; |
99 final Writer writer; |
104 final Writer writer; |
100 final SSLEngine engine; |
105 final SSLEngine engine; |
101 final String tubeName; // hack |
106 final String tubeName; // hack |
102 final CompletableFuture<String> alpnCF; // completes on initial handshake |
107 final CompletableFuture<String> alpnCF; // completes on initial handshake |
|
108 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped |
103 volatile boolean close_notify_received; |
109 volatile boolean close_notify_received; |
104 final CompletableFuture<Void> readerCF; |
110 final CompletableFuture<Void> readerCF; |
105 final CompletableFuture<Void> writerCF; |
111 final CompletableFuture<Void> writerCF; |
106 final Consumer<ByteBuffer> recycler; |
112 final Consumer<ByteBuffer> recycler; |
107 static AtomicInteger scount = new AtomicInteger(1); |
113 static AtomicInteger scount = new AtomicInteger(1); |
150 |
156 |
151 // connect the Reader to the downReader and the |
157 // connect the Reader to the downReader and the |
152 // Writer to the downWriter. |
158 // Writer to the downWriter. |
153 connect(downReader, downWriter); |
159 connect(downReader, downWriter); |
154 |
160 |
155 if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"))) |
161 if (isMonitored) Monitor.add(monitor); |
156 Monitor.add(this::monitor); |
|
157 } |
162 } |
158 |
163 |
159 /** |
164 /** |
160 * Returns true if the SSLFlowDelegate has detected a TLS |
165 * Returns true if the SSLFlowDelegate has detected a TLS |
161 * close_notify from the server. |
166 * close_notify from the server. |
200 } |
205 } |
201 |
206 |
202 public String monitor() { |
207 public String monitor() { |
203 StringBuilder sb = new StringBuilder(); |
208 StringBuilder sb = new StringBuilder(); |
204 sb.append("SSL: id ").append(id); |
209 sb.append("SSL: id ").append(id); |
|
210 sb.append(" ").append(dbgString()); |
205 sb.append(" HS state: " + states(handshakeState)); |
211 sb.append(" HS state: " + states(handshakeState)); |
206 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
212 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
207 if (stateList != null) { |
213 if (stateList != null) { |
208 sb.append(" LL : "); |
214 sb.append(" LL : "); |
209 for (String s : stateList) { |
215 for (String s : stateList) { |
291 scheduler.runOrSchedule(exec); |
297 scheduler.runOrSchedule(exec); |
292 } |
298 } |
293 |
299 |
294 @Override |
300 @Override |
295 public String toString() { |
301 public String toString() { |
296 return "READER: " + super.toString() + " readBuf: " + readBuf.toString() |
302 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() |
297 + " count: " + count.toString(); |
303 + ", count: " + count.toString() + ", scheduler: " |
|
304 + (scheduler.isStopped() ? "stopped" : "running") |
|
305 + ", status: " + lastUnwrapStatus; |
298 } |
306 } |
299 |
307 |
300 private void reallocReadBuf() { |
308 private void reallocReadBuf() { |
301 int sz = readBuf.capacity(); |
309 int sz = readBuf.capacity(); |
302 ByteBuffer newb = ByteBuffer.allocate(sz * 2); |
310 ByteBuffer newb = ByteBuffer.allocate(sz * 2); |
393 if (result.status() == Status.BUFFER_UNDERFLOW) { |
402 if (result.status() == Status.BUFFER_UNDERFLOW) { |
394 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); |
403 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); |
395 // not enough data in the read buffer... |
404 // not enough data in the read buffer... |
396 // no need to try to unwrap again unless we get more bytes |
405 // no need to try to unwrap again unless we get more bytes |
397 // than minBytesRequired = len in the read buffer. |
406 // than minBytesRequired = len in the read buffer. |
398 minBytesRequired = len; |
|
399 synchronized (readBufferLock) { |
407 synchronized (readBufferLock) { |
|
408 minBytesRequired = len; |
400 // more bytes could already have been added... |
409 // more bytes could already have been added... |
401 assert readBuf.remaining() >= len; |
410 assert readBuf.remaining() >= len; |
402 // check if we have received some data, and if so |
411 // check if we have received some data, and if so |
403 // we can just re-spin the loop |
412 // we can just re-spin the loop |
404 if (readBuf.remaining() > len) continue; |
413 if (readBuf.remaining() > len) continue; |
|
414 else if (this.completing) { |
|
415 if (debug.on()) { |
|
416 debugr.log("BUFFER_UNDERFLOW with EOF," + |
|
417 " %d bytes non decrypted.", len); |
|
418 } |
|
419 // The channel won't send us any more data, and |
|
420 // we are in underflow: we need to fail. |
|
421 throw new IOException("BUFFER_UNDERFLOW with EOF, " |
|
422 + len + " bytes non decrypted."); |
|
423 } |
405 } |
424 } |
406 // request more data and return. |
425 // request more data and return. |
407 requestMore(); |
426 requestMore(); |
408 return; |
427 return; |
409 } |
428 } |
450 errorCommon(ex); |
470 errorCommon(ex); |
451 handleError(ex); |
471 handleError(ex); |
452 } |
472 } |
453 } |
473 } |
454 |
474 |
|
475 private volatile Status lastUnwrapStatus; |
455 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { |
476 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { |
456 ByteBuffer dst = getAppBuffer(); |
477 ByteBuffer dst = getAppBuffer(); |
457 int len = src.remaining(); |
478 int len = src.remaining(); |
458 while (true) { |
479 while (true) { |
459 SSLEngineResult sslResult = engine.unwrap(src, dst); |
480 SSLEngineResult sslResult = engine.unwrap(src, dst); |
460 switch (sslResult.getStatus()) { |
481 switch (lastUnwrapStatus = sslResult.getStatus()) { |
461 case BUFFER_OVERFLOW: |
482 case BUFFER_OVERFLOW: |
462 // may happen if app size buffer was changed, or if |
483 // may happen if app size buffer was changed, or if |
463 // our 'adaptiveBufferSize' guess was too small for |
484 // our 'adaptiveBufferSize' guess was too small for |
464 // the current payload. In that case, update the |
485 // the current payload. In that case, update the |
465 // value of applicationBufferSize, and allocate a |
486 // value of applicationBufferSize, and allocate a |
505 public interface Monitorable { |
526 public interface Monitorable { |
506 public String getInfo(); |
527 public String getInfo(); |
507 } |
528 } |
508 |
529 |
509 public static class Monitor extends Thread { |
530 public static class Monitor extends Thread { |
510 final List<Monitorable> list; |
531 final List<WeakReference<Monitorable>> list; |
|
532 final List<FinalMonitorable> finalList; |
|
533 final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>(); |
511 static Monitor themon; |
534 static Monitor themon; |
512 |
535 |
513 static { |
536 static { |
514 themon = new Monitor(); |
537 themon = new Monitor(); |
515 themon.start(); // uncomment to enable Monitor |
538 themon.start(); // uncomment to enable Monitor |
|
539 } |
|
540 |
|
541 // An instance used to temporarily store the |
|
542 // last observable state of a monitorable object. |
|
543 // When Monitor.remove(o) is called, we replace |
|
544 // 'o' with a FinalMonitorable whose reference |
|
545 // will be enqueued after the last observable state |
|
546 // has been printed. |
|
547 final class FinalMonitorable implements Monitorable { |
|
548 final String finalState; |
|
549 FinalMonitorable(Monitorable o) { |
|
550 finalState = o.getInfo(); |
|
551 finalList.add(this); |
|
552 } |
|
553 @Override |
|
554 public String getInfo() { |
|
555 finalList.remove(this); |
|
556 return finalState; |
|
557 } |
516 } |
558 } |
517 |
559 |
518 Monitor() { |
560 Monitor() { |
519 super("Monitor"); |
561 super("Monitor"); |
520 setDaemon(true); |
562 setDaemon(true); |
521 list = Collections.synchronizedList(new LinkedList<>()); |
563 list = Collections.synchronizedList(new LinkedList<>()); |
|
564 finalList = new ArrayList<>(); // access is synchronized on list above |
522 } |
565 } |
523 |
566 |
524 void addTarget(Monitorable o) { |
567 void addTarget(Monitorable o) { |
525 list.add(o); |
568 list.add(new WeakReference<>(o, queue)); |
|
569 } |
|
570 void removeTarget(Monitorable o) { |
|
571 // It can take a long time for GC to clean up references. |
|
572 // Calling Monitor.remove() early helps removing noise from the |
|
573 // logs/ |
|
574 synchronized (list) { |
|
575 Iterator<WeakReference<Monitorable>> it = list.iterator(); |
|
576 while (it.hasNext()) { |
|
577 Monitorable m = it.next().get(); |
|
578 if (m == null) it.remove(); |
|
579 if (o == m) { |
|
580 it.remove(); |
|
581 break; |
|
582 } |
|
583 } |
|
584 FinalMonitorable m = new FinalMonitorable(o); |
|
585 addTarget(m); |
|
586 Reference.reachabilityFence(m); |
|
587 } |
526 } |
588 } |
527 |
589 |
528 public static void add(Monitorable o) { |
590 public static void add(Monitorable o) { |
529 themon.addTarget(o); |
591 themon.addTarget(o); |
|
592 } |
|
593 public static void remove(Monitorable o) { |
|
594 themon.removeTarget(o); |
530 } |
595 } |
531 |
596 |
532 @Override |
597 @Override |
533 public void run() { |
598 public void run() { |
534 System.out.println("Monitor starting"); |
599 System.out.println("Monitor starting"); |
535 try { |
600 try { |
536 while (true) { |
601 while (true) { |
537 Thread.sleep(20 * 1000); |
602 Thread.sleep(20 * 1000); |
538 synchronized (list) { |
603 synchronized (list) { |
539 for (Monitorable o : list) { |
604 Reference<? extends Monitorable> expired; |
|
605 while ((expired = queue.poll()) != null) list.remove(expired); |
|
606 for (WeakReference<Monitorable> ref : list) { |
|
607 Monitorable o = ref.get(); |
|
608 if (o == null) continue; |
|
609 if (o instanceof FinalMonitorable) { |
|
610 ref.enqueue(); |
|
611 } |
540 System.out.println(o.getInfo()); |
612 System.out.println(o.getInfo()); |
541 System.out.println("-------------------------"); |
613 System.out.println("-------------------------"); |
542 } |
614 } |
543 } |
615 } |
544 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); |
616 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); |
731 // If the SSLEngine produces less than writeBuffer.capacity() / 2, |
803 // If the SSLEngine produces less than writeBuffer.capacity() / 2, |
732 // then we copy off the bytes to a smaller buffer that we send |
804 // then we copy off the bytes to a smaller buffer that we send |
733 // downstream. Otherwise, we send the writeBuffer downstream |
805 // downstream. Otherwise, we send the writeBuffer downstream |
734 // and will allocate a new one next time. |
806 // and will allocate a new one next time. |
735 volatile ByteBuffer writeBuffer; |
807 volatile ByteBuffer writeBuffer; |
|
808 private volatile Status lastWrappedStatus; |
736 @SuppressWarnings("fallthrough") |
809 @SuppressWarnings("fallthrough") |
737 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { |
810 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { |
738 long len = Utils.remaining(src); |
811 long len = Utils.remaining(src); |
739 if (debugw.on()) |
812 if (debugw.on()) |
740 debugw.log("wrapping " + len + " bytes"); |
813 debugw.log("wrapping " + len + " bytes"); |
745 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); |
818 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); |
746 |
819 |
747 while (true) { |
820 while (true) { |
748 SSLEngineResult sslResult = engine.wrap(src, dst); |
821 SSLEngineResult sslResult = engine.wrap(src, dst); |
749 if (debugw.on()) debugw.log("SSLResult: " + sslResult); |
822 if (debugw.on()) debugw.log("SSLResult: " + sslResult); |
750 switch (sslResult.getStatus()) { |
823 switch (lastWrappedStatus = sslResult.getStatus()) { |
751 case BUFFER_OVERFLOW: |
824 case BUFFER_OVERFLOW: |
752 // Shouldn't happen. We allocated buffer with packet size |
825 // Shouldn't happen. We allocated buffer with packet size |
753 // get it again if net buffer size was changed |
826 // get it again if net buffer size was changed |
754 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); |
827 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); |
755 int netSize = packetBufferSize |
828 int netSize = packetBufferSize |