351 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
285 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
352 publisher.subscribe(subscriber); |
286 publisher.subscribe(subscriber); |
353 } |
287 } |
354 } |
288 } |
355 |
289 |
356 private static void sleep(long millis) { |
|
357 try { |
|
358 Thread.sleep(millis); |
|
359 } catch (InterruptedException e) { |
|
360 |
|
361 } |
|
362 } |
|
363 |
|
364 /** |
|
365 * Creates a cross-over FlowTube than can be plugged into a client-side |
|
366 * SSLTube (in place of the SSLLoopbackSubscriber). |
|
367 * Note that the only method that can be called on the return tube |
|
368 * is connectFlows(). Calling any other method will trigger an |
|
369 * InternalError. |
|
370 * @param sslExecutor an executor |
|
371 * @return a cross-over FlowTube connected to an EchoTube. |
|
372 * @throws IOException |
|
373 */ |
|
374 FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException { |
|
375 LateBindingTube crossOver = new LateBindingTube(); |
|
376 FlowTube server = new SSLTube(createSSLEngine(false), |
|
377 sslExecutor, |
|
378 crossOver); |
|
379 EchoTube echo = new EchoTube(6); |
|
380 server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo)); |
|
381 |
|
382 return new CrossOverTube(crossOver); |
|
383 } |
|
384 |
|
385 /** |
|
386 * A cross-over FlowTube that makes it possible to reverse the direction |
|
387 * of flows. The typical usage is to connect an two opposite SSLTube, |
|
388 * one encrypting, one decrypting, to e.g. an EchoTube, with the help |
|
389 * of a LateBindingTube: |
|
390 * {@code |
|
391 * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
|
392 * } |
|
393 * <p> |
|
394 * Note that the only method that can be called on the CrossOverTube is |
|
395 * connectFlows(). Calling any other method will cause an InternalError to |
|
396 * be thrown. |
|
397 * Also connectFlows() can be called only once. |
|
398 */ |
|
399 private static final class CrossOverTube implements FlowTube { |
|
400 final LateBindingTube tube; |
|
401 CrossOverTube(LateBindingTube tube) { |
|
402 this.tube = tube; |
|
403 } |
|
404 |
|
405 @Override |
|
406 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
407 throw newInternalError(); |
|
408 } |
|
409 |
|
410 @Override |
|
411 public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) { |
|
412 tube.start(writePublisher, readSubscriber); |
|
413 } |
|
414 |
|
415 @Override |
|
416 public boolean isFinished() { |
|
417 return tube.isFinished(); |
|
418 } |
|
419 |
|
420 Error newInternalError() { |
|
421 InternalError error = new InternalError(); |
|
422 error.printStackTrace(System.out); |
|
423 return error; |
|
424 } |
|
425 |
|
426 @Override |
|
427 public void onSubscribe(Flow.Subscription subscription) { |
|
428 throw newInternalError(); |
|
429 } |
|
430 |
|
431 @Override |
|
432 public void onError(Throwable throwable) { |
|
433 throw newInternalError(); |
|
434 } |
|
435 |
|
436 @Override |
|
437 public void onComplete() { |
|
438 throw newInternalError(); |
|
439 } |
|
440 |
|
441 @Override |
|
442 public void onNext(List<ByteBuffer> item) { |
|
443 throw newInternalError(); |
|
444 } |
|
445 } |
|
446 |
|
447 /** |
|
448 * A late binding tube that makes it possible to create an |
|
449 * SSLTube before the right-hand-side tube has been created. |
|
450 * The typical usage is to make it possible to connect two |
|
451 * opposite SSLTube (one encrypting, one decrypting) through a |
|
452 * CrossOverTube: |
|
453 * {@code |
|
454 * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
|
455 * } |
|
456 * <p> |
|
457 * Note that this class only supports a single call to start(): it cannot be |
|
458 * subscribed more than once from its left-hand-side (the cross over tube side). |
|
459 */ |
|
460 private static class LateBindingTube implements FlowTube { |
|
461 |
|
462 final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher |
|
463 = new CompletableFuture<>(); |
|
464 final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue |
|
465 = new ConcurrentLinkedQueue<>(); |
|
466 AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>(); |
|
467 SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop); |
|
468 AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
469 private volatile boolean finished; |
|
470 private volatile boolean completed; |
|
471 |
|
472 |
|
473 public void start(Flow.Publisher<List<ByteBuffer>> publisher, |
|
474 Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
475 subscriberRef.set(subscriber); |
|
476 futurePublisher.complete(publisher); |
|
477 scheduler.runOrSchedule(); |
|
478 } |
|
479 |
|
480 @Override |
|
481 public boolean isFinished() { |
|
482 return finished; |
|
483 } |
|
484 |
|
485 @Override |
|
486 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
487 futurePublisher.thenAccept((p) -> p.subscribe(subscriber)); |
|
488 scheduler.runOrSchedule(); |
|
489 } |
|
490 |
|
491 @Override |
|
492 public void onSubscribe(Flow.Subscription subscription) { |
|
493 queue.add((s) -> s.onSubscribe(subscription)); |
|
494 scheduler.runOrSchedule(); |
|
495 } |
|
496 |
|
497 @Override |
|
498 public void onNext(List<ByteBuffer> item) { |
|
499 queue.add((s) -> s.onNext(item)); |
|
500 scheduler.runOrSchedule(); |
|
501 } |
|
502 |
|
503 @Override |
|
504 public void onError(Throwable throwable) { |
|
505 System.out.println("LateBindingTube onError"); |
|
506 throwable.printStackTrace(System.out); |
|
507 queue.add((s) -> { |
|
508 errorRef.compareAndSet(null, throwable); |
|
509 try { |
|
510 System.out.println("LateBindingTube subscriber onError: " + throwable); |
|
511 s.onError(errorRef.get()); |
|
512 } finally { |
|
513 finished = true; |
|
514 System.out.println("LateBindingTube finished"); |
|
515 } |
|
516 }); |
|
517 scheduler.runOrSchedule(); |
|
518 } |
|
519 |
|
520 @Override |
|
521 public void onComplete() { |
|
522 System.out.println("LateBindingTube completing"); |
|
523 queue.add((s) -> { |
|
524 completed = true; |
|
525 try { |
|
526 System.out.println("LateBindingTube complete subscriber"); |
|
527 s.onComplete(); |
|
528 } finally { |
|
529 finished = true; |
|
530 System.out.println("LateBindingTube finished"); |
|
531 } |
|
532 }); |
|
533 scheduler.runOrSchedule(); |
|
534 } |
|
535 |
|
536 private void loop() { |
|
537 if (finished) { |
|
538 scheduler.stop(); |
|
539 return; |
|
540 } |
|
541 Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get(); |
|
542 if (subscriber == null) return; |
|
543 try { |
|
544 Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s; |
|
545 while ((s = queue.poll()) != null) { |
|
546 s.accept(subscriber); |
|
547 } |
|
548 } catch (Throwable t) { |
|
549 if (errorRef.compareAndSet(null, t)) { |
|
550 onError(t); |
|
551 } |
|
552 } |
|
553 } |
|
554 } |
|
555 |
|
556 /** |
|
557 * An echo tube that just echoes back whatever bytes it receives. |
|
558 * This cannot be plugged to the right-hand-side of an SSLTube |
|
559 * since handshake data cannot be simply echoed back, and |
|
560 * application data most likely also need to be decrypted and |
|
561 * re-encrypted. |
|
562 */ |
|
563 private static final class EchoTube implements FlowTube { |
|
564 |
|
565 private final static Object EOF = new Object(); |
|
566 private final Executor executor = Executors.newSingleThreadExecutor(); |
|
567 |
|
568 private final Queue<Object> queue = new ConcurrentLinkedQueue<>(); |
|
569 private final int maxQueueSize; |
|
570 private final SequentialScheduler processingScheduler = |
|
571 new SequentialScheduler(createProcessingTask()); |
|
572 |
|
573 /* Writing into this tube */ |
|
574 private volatile long requested; |
|
575 private Flow.Subscription subscription; |
|
576 |
|
577 /* Reading from this tube */ |
|
578 private final Demand demand = new Demand(); |
|
579 private final AtomicBoolean cancelled = new AtomicBoolean(); |
|
580 private Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
|
581 |
|
582 private EchoTube(int maxBufferSize) { |
|
583 if (maxBufferSize < 1) |
|
584 throw new IllegalArgumentException(); |
|
585 this.maxQueueSize = maxBufferSize; |
|
586 } |
|
587 |
|
588 @Override |
|
589 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
590 this.subscriber = subscriber; |
|
591 System.out.println("EchoTube got subscriber: " + subscriber); |
|
592 this.subscriber.onSubscribe(new InternalSubscription()); |
|
593 } |
|
594 |
|
595 @Override |
|
596 public void onSubscribe(Flow.Subscription subscription) { |
|
597 System.out.println("EchoTube request: " + maxQueueSize); |
|
598 (this.subscription = subscription).request(requested = maxQueueSize); |
|
599 } |
|
600 |
|
601 private void requestMore() { |
|
602 Flow.Subscription s = subscription; |
|
603 if (s == null || cancelled.get()) return; |
|
604 long unfulfilled = queue.size() + --requested; |
|
605 if (unfulfilled <= maxQueueSize/2) { |
|
606 long req = maxQueueSize - unfulfilled; |
|
607 requested += req; |
|
608 s.request(req); |
|
609 System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n", |
|
610 req, requested-req, queue.size(), unfulfilled ); |
|
611 } |
|
612 } |
|
613 |
|
614 @Override |
|
615 public void onNext(List<ByteBuffer> item) { |
|
616 System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", |
|
617 Utils.remaining(item), requested, queue.size()); |
|
618 queue.add(item); |
|
619 processingScheduler.deferOrSchedule(executor); |
|
620 } |
|
621 |
|
622 @Override |
|
623 public void onError(Throwable throwable) { |
|
624 System.out.println("EchoTube add " + throwable); |
|
625 queue.add(throwable); |
|
626 processingScheduler.deferOrSchedule(executor); |
|
627 } |
|
628 |
|
629 @Override |
|
630 public void onComplete() { |
|
631 System.out.println("EchoTube add EOF"); |
|
632 queue.add(EOF); |
|
633 processingScheduler.deferOrSchedule(executor); |
|
634 } |
|
635 |
|
636 @Override |
|
637 public boolean isFinished() { |
|
638 return cancelled.get(); |
|
639 } |
|
640 |
|
641 private class InternalSubscription implements Flow.Subscription { |
|
642 |
|
643 @Override |
|
644 public void request(long n) { |
|
645 System.out.println("EchoTube got request: " + n); |
|
646 if (n <= 0) { |
|
647 throw new InternalError(); |
|
648 } |
|
649 if (demand.increase(n)) { |
|
650 processingScheduler.deferOrSchedule(executor); |
|
651 } |
|
652 } |
|
653 |
|
654 @Override |
|
655 public void cancel() { |
|
656 cancelled.set(true); |
|
657 } |
|
658 } |
|
659 |
|
660 @Override |
|
661 public String toString() { |
|
662 return "EchoTube"; |
|
663 } |
|
664 |
|
665 int transmitted = 0; |
|
666 private SequentialScheduler.RestartableTask createProcessingTask() { |
|
667 return new SequentialScheduler.CompleteRestartableTask() { |
|
668 |
|
669 @Override |
|
670 protected void run() { |
|
671 try { |
|
672 while (!cancelled.get()) { |
|
673 Object item = queue.peek(); |
|
674 if (item == null) { |
|
675 System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n", |
|
676 requested, demand.get(), transmitted); |
|
677 requestMore(); |
|
678 return; |
|
679 } |
|
680 try { |
|
681 System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n", |
|
682 requested, demand.get(), transmitted); |
|
683 if (item instanceof List) { |
|
684 if (!demand.tryDecrement()) { |
|
685 System.out.println("EchoTube no demand"); |
|
686 return; |
|
687 } |
|
688 @SuppressWarnings("unchecked") |
|
689 List<ByteBuffer> bytes = (List<ByteBuffer>) item; |
|
690 Object removed = queue.remove(); |
|
691 assert removed == item; |
|
692 System.out.println("EchoTube processing " |
|
693 + Utils.remaining(bytes)); |
|
694 transmitted++; |
|
695 subscriber.onNext(bytes); |
|
696 requestMore(); |
|
697 } else if (item instanceof Throwable) { |
|
698 cancelled.set(true); |
|
699 Object removed = queue.remove(); |
|
700 assert removed == item; |
|
701 System.out.println("EchoTube processing " + item); |
|
702 subscriber.onError((Throwable) item); |
|
703 } else if (item == EOF) { |
|
704 cancelled.set(true); |
|
705 Object removed = queue.remove(); |
|
706 assert removed == item; |
|
707 System.out.println("EchoTube processing EOF"); |
|
708 subscriber.onComplete(); |
|
709 } else { |
|
710 throw new InternalError(String.valueOf(item)); |
|
711 } |
|
712 } finally { |
|
713 } |
|
714 } |
|
715 } catch(Throwable t) { |
|
716 t.printStackTrace(); |
|
717 throw t; |
|
718 } |
|
719 } |
|
720 }; |
|
721 } |
|
722 } |
|
723 |
|
724 /** |
|
725 * The final subscriber which receives the decrypted looped-back data. Just |
|
726 * needs to compare the data with what was sent. The given CF is either |
|
727 * completed exceptionally with an error or normally on success. |
|
728 */ |
|
729 private static class EndSubscriber implements FlowTube.TubeSubscriber { |
|
730 |
|
731 private static final int REQUEST_WINDOW = 13; |
|
732 |
|
733 private final long nbytes; |
|
734 private final AtomicLong counter = new AtomicLong(); |
|
735 private final CompletableFuture<?> completion; |
|
736 private volatile Flow.Subscription subscription; |
|
737 private long unfulfilled; |
|
738 |
|
739 EndSubscriber(long nbytes, CompletableFuture<?> completion) { |
|
740 this.nbytes = nbytes; |
|
741 this.completion = completion; |
|
742 } |
|
743 |
|
744 @Override |
|
745 public void onSubscribe(Flow.Subscription subscription) { |
|
746 this.subscription = subscription; |
|
747 unfulfilled = REQUEST_WINDOW; |
|
748 System.out.println("EndSubscriber request " + REQUEST_WINDOW); |
|
749 subscription.request(REQUEST_WINDOW); |
|
750 } |
|
751 |
|
752 public static String info(List<ByteBuffer> i) { |
|
753 StringBuilder sb = new StringBuilder(); |
|
754 sb.append("size: ").append(Integer.toString(i.size())); |
|
755 int x = 0; |
|
756 for (ByteBuffer b : i) |
|
757 x += b.remaining(); |
|
758 sb.append(" bytes: ").append(x); |
|
759 return sb.toString(); |
|
760 } |
|
761 |
|
762 @Override |
|
763 public void onNext(List<ByteBuffer> buffers) { |
|
764 if (--unfulfilled == (REQUEST_WINDOW / 2)) { |
|
765 long req = REQUEST_WINDOW - unfulfilled; |
|
766 System.out.println("EndSubscriber request " + req); |
|
767 unfulfilled = REQUEST_WINDOW; |
|
768 subscription.request(req); |
|
769 } |
|
770 |
|
771 long currval = counter.get(); |
|
772 if (currval % 500 == 0) { |
|
773 System.out.println("EndSubscriber: " + currval); |
|
774 } |
|
775 System.out.println("EndSubscriber onNext " + Utils.remaining(buffers)); |
|
776 |
|
777 for (ByteBuffer buf : buffers) { |
|
778 while (buf.hasRemaining()) { |
|
779 long n = buf.getLong(); |
|
780 if (currval > (SSLTubeTest.TOTAL_LONGS - 50)) { |
|
781 System.out.println("End: " + currval); |
|
782 } |
|
783 if (n != currval++) { |
|
784 System.out.println("ERROR at " + n + " != " + (currval - 1)); |
|
785 completion.completeExceptionally(new RuntimeException("ERROR")); |
|
786 subscription.cancel(); |
|
787 return; |
|
788 } |
|
789 } |
|
790 } |
|
791 |
|
792 counter.set(currval); |
|
793 } |
|
794 |
|
795 @Override |
|
796 public void onError(Throwable throwable) { |
|
797 System.out.println("EndSubscriber onError " + throwable); |
|
798 completion.completeExceptionally(throwable); |
|
799 } |
|
800 |
|
801 @Override |
|
802 public void onComplete() { |
|
803 long n = counter.get(); |
|
804 if (n != nbytes) { |
|
805 System.out.printf("nbytes=%d n=%d\n", nbytes, n); |
|
806 completion.completeExceptionally(new RuntimeException("ERROR AT END")); |
|
807 } else { |
|
808 System.out.println("DONE OK"); |
|
809 completion.complete(null); |
|
810 } |
|
811 } |
|
812 @Override |
|
813 public String toString() { |
|
814 return "EndSubscriber"; |
|
815 } |
|
816 } |
|
817 |
|
818 private static SSLEngine createSSLEngine(boolean client) throws IOException { |
|
819 SSLContext context = (new SimpleSSLContext()).get(); |
|
820 SSLEngine engine = context.createSSLEngine(); |
|
821 SSLParameters params = context.getSupportedSSLParameters(); |
|
822 params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl |
|
823 if (client) { |
|
824 params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2 |
|
825 } else { |
|
826 params.setApplicationProtocols(new String[]{"proto2"}); // server will choose proto2 |
|
827 } |
|
828 engine.setSSLParameters(params); |
|
829 engine.setUseClientMode(client); |
|
830 return engine; |
|
831 } |
|
832 |
|
833 /** |
|
834 * Creates a simple usable SSLContext for SSLSocketFactory or a HttpsServer |
|
835 * using either a given keystore or a default one in the test tree. |
|
836 * |
|
837 * Using this class with a security manager requires the following |
|
838 * permissions to be granted: |
|
839 * |
|
840 * permission "java.util.PropertyPermission" "test.src.path", "read"; |
|
841 * permission java.io.FilePermission "${test.src}/../../../../lib/testlibrary/jdk/testlibrary/testkeys", |
|
842 * "read"; The exact path above depends on the location of the test. |
|
843 */ |
|
844 private static class SimpleSSLContext { |
|
845 |
|
846 private final SSLContext ssl; |
|
847 |
|
848 /** |
|
849 * Loads default keystore from SimpleSSLContext source directory |
|
850 */ |
|
851 public SimpleSSLContext() throws IOException { |
|
852 String paths = System.getProperty("test.src.path"); |
|
853 StringTokenizer st = new StringTokenizer(paths, File.pathSeparator); |
|
854 boolean securityExceptions = false; |
|
855 SSLContext sslContext = null; |
|
856 while (st.hasMoreTokens()) { |
|
857 String path = st.nextToken(); |
|
858 try { |
|
859 File f = new File(path, "../../../../lib/testlibrary/jdk/testlibrary/testkeys"); |
|
860 if (f.exists()) { |
|
861 try (FileInputStream fis = new FileInputStream(f)) { |
|
862 sslContext = init(fis); |
|
863 break; |
|
864 } |
|
865 } |
|
866 } catch (SecurityException e) { |
|
867 // catch and ignore because permission only required |
|
868 // for one entry on path (at most) |
|
869 securityExceptions = true; |
|
870 } |
|
871 } |
|
872 if (securityExceptions) { |
|
873 System.err.println("SecurityExceptions thrown on loading testkeys"); |
|
874 } |
|
875 ssl = sslContext; |
|
876 } |
|
877 |
|
878 private SSLContext init(InputStream i) throws IOException { |
|
879 try { |
|
880 char[] passphrase = "passphrase".toCharArray(); |
|
881 KeyStore ks = KeyStore.getInstance("JKS"); |
|
882 ks.load(i, passphrase); |
|
883 |
|
884 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); |
|
885 kmf.init(ks, passphrase); |
|
886 |
|
887 TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); |
|
888 tmf.init(ks); |
|
889 |
|
890 SSLContext ssl = SSLContext.getInstance("TLS"); |
|
891 ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); |
|
892 return ssl; |
|
893 } catch (KeyManagementException | KeyStoreException | |
|
894 UnrecoverableKeyException | CertificateException | |
|
895 NoSuchAlgorithmException e) { |
|
896 throw new RuntimeException(e.getMessage()); |
|
897 } |
|
898 } |
|
899 |
|
900 public SSLContext get() { |
|
901 return ssl; |
|
902 } |
|
903 } |
|
904 } |
290 } |