67 import java.util.concurrent.LinkedBlockingQueue; |
68 import java.util.concurrent.LinkedBlockingQueue; |
68 import java.util.concurrent.SubmissionPublisher; |
69 import java.util.concurrent.SubmissionPublisher; |
69 import java.util.concurrent.atomic.AtomicBoolean; |
70 import java.util.concurrent.atomic.AtomicBoolean; |
70 import java.util.concurrent.atomic.AtomicInteger; |
71 import java.util.concurrent.atomic.AtomicInteger; |
71 import java.util.concurrent.atomic.AtomicLong; |
72 import java.util.concurrent.atomic.AtomicLong; |
|
73 import java.util.concurrent.atomic.AtomicReference; |
|
74 import java.util.function.Consumer; |
72 |
75 |
73 @Test |
76 @Test |
74 public class SSLTubeTest { |
77 public class SSLTubeTest { |
75 |
78 |
76 private static final long COUNTER = 600; |
79 private static final long COUNTER = 600; |
77 private static final int LONGS_PER_BUF = 800; |
80 private static final int LONGS_PER_BUF = 800; |
78 private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
81 private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
|
82 public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
|
83 |
|
84 static final Random rand = new Random(); |
|
85 |
|
86 static int randomRange(int lower, int upper) { |
|
87 if (lower > upper) |
|
88 throw new IllegalArgumentException("lower > upper"); |
|
89 int diff = upper - lower; |
|
90 int r = lower + rand.nextInt(diff); |
|
91 return r - (r % 8); // round down to multiple of 8 (align for longs) |
|
92 } |
79 |
93 |
80 private static ByteBuffer getBuffer(long startingAt) { |
94 private static ByteBuffer getBuffer(long startingAt) { |
81 ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); |
95 ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); |
82 for (int j = 0; j < LONGS_PER_BUF; j++) { |
96 for (int j = 0; j < LONGS_PER_BUF; j++) { |
83 buf.putLong(startingAt++); |
97 buf.putLong(startingAt++); |
84 } |
98 } |
85 buf.flip(); |
99 buf.flip(); |
86 return buf; |
100 return buf; |
87 } |
101 } |
88 |
102 |
89 @Test(timeOut = 30000) |
103 @Test |
90 public void run() throws IOException { |
104 public void runWithSSLLoopackServer() throws IOException { |
|
105 ExecutorService sslExecutor = Executors.newCachedThreadPool(); |
|
106 |
91 /* Start of wiring */ |
107 /* Start of wiring */ |
92 ExecutorService sslExecutor = Executors.newCachedThreadPool(); |
|
93 /* Emulates an echo server */ |
108 /* Emulates an echo server */ |
94 // FlowTube server = new SSLTube(createSSLEngine(false), |
|
95 // sslExecutor, |
|
96 // new EchoTube(16)); |
|
97 SSLLoopbackSubscriber server = |
109 SSLLoopbackSubscriber server = |
98 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); |
110 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); |
99 server.start(); |
111 server.start(); |
100 |
112 |
|
113 run(server, sslExecutor); |
|
114 } |
|
115 |
|
116 @Test |
|
117 public void runWithEchoServer() throws IOException { |
|
118 ExecutorService sslExecutor = Executors.newCachedThreadPool(); |
|
119 |
|
120 /* Start of wiring */ |
|
121 /* Emulates an echo server */ |
|
122 FlowTube server = crossOverEchoServer(sslExecutor); |
|
123 |
|
124 run(server, sslExecutor); |
|
125 } |
|
126 |
|
127 private void run(FlowTube server, ExecutorService sslExecutor) throws IOException { |
101 FlowTube client = new SSLTube(createSSLEngine(true), |
128 FlowTube client = new SSLTube(createSSLEngine(true), |
102 sslExecutor, |
129 sslExecutor, |
103 server); |
130 server); |
104 SubmissionPublisher<List<ByteBuffer>> p = |
131 SubmissionPublisher<List<ByteBuffer>> p = |
105 new SubmissionPublisher<>(ForkJoinPool.commonPool(), |
132 new SubmissionPublisher<>(ForkJoinPool.commonPool(), |
328 Thread.sleep(millis); |
358 Thread.sleep(millis); |
329 } catch (InterruptedException e) { |
359 } catch (InterruptedException e) { |
330 |
360 |
331 } |
361 } |
332 } |
362 } |
333 // private static final class EchoTube implements FlowTube { |
363 |
334 // |
364 /** |
335 // private final static Object EOF = new Object(); |
365 * Creates a cross-over FlowTube than can be plugged into a client-side |
336 // private final Executor executor = Executors.newSingleThreadExecutor(); |
366 * SSLTube (in place of the SSLLoopbackSubscriber). |
337 // |
367 * Note that the only method that can be called on the return tube |
338 // private final Queue<Object> queue = new ConcurrentLinkedQueue<>(); |
368 * is connectFlows(). Calling any other method will trigger an |
339 // private final int maxQueueSize; |
369 * InternalError. |
340 // private final SequentialScheduler processingScheduler = |
370 * @param sslExecutor an executor |
341 // new SequentialScheduler(createProcessingTask()); |
371 * @return a cross-over FlowTube connected to an EchoTube. |
342 // |
372 * @throws IOException |
343 // /* Writing into this tube */ |
373 */ |
344 // private long unfulfilled; |
374 FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException { |
345 // private Flow.Subscription subscription; |
375 LateBindingTube crossOver = new LateBindingTube(); |
346 // |
376 FlowTube server = new SSLTube(createSSLEngine(false), |
347 // /* Reading from this tube */ |
377 sslExecutor, |
348 // private final Demand demand = new Demand(); |
378 crossOver); |
349 // private final AtomicBoolean cancelled = new AtomicBoolean(); |
379 EchoTube echo = new EchoTube(6); |
350 // private Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
380 server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo)); |
351 // |
381 |
352 // private EchoTube(int maxBufferSize) { |
382 return new CrossOverTube(crossOver); |
353 // if (maxBufferSize < 1) |
383 } |
354 // throw new IllegalArgumentException(); |
384 |
355 // this.maxQueueSize = maxBufferSize; |
385 /** |
356 // } |
386 * A cross-over FlowTube that makes it possible to reverse the direction |
357 // |
387 * of flows. The typical usage is to connect an two opposite SSLTube, |
358 // @Override |
388 * one encrypting, one decrypting, to e.g. an EchoTube, with the help |
359 // public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
389 * of a LateBindingTube: |
360 // this.subscriber = subscriber; |
390 * {@code |
361 // System.out.println("EchoTube got subscriber: " + subscriber); |
391 * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
362 // this.subscriber.onSubscribe(new InternalSubscription()); |
392 * } |
363 // } |
393 * <p> |
364 // |
394 * Note that the only method that can be called on the CrossOverTube is |
365 // @Override |
395 * connectFlows(). Calling any other method will cause an InternalError to |
366 // public void onSubscribe(Flow.Subscription subscription) { |
396 * be thrown. |
367 // unfulfilled = maxQueueSize; |
397 * Also connectFlows() can be called only once. |
368 // System.out.println("EchoTube request: " + maxQueueSize); |
398 */ |
369 // (this.subscription = subscription).request(maxQueueSize); |
399 private static final class CrossOverTube implements FlowTube { |
370 // } |
400 final LateBindingTube tube; |
371 // |
401 CrossOverTube(LateBindingTube tube) { |
372 // @Override |
402 this.tube = tube; |
373 // public void onNext(List<ByteBuffer> item) { |
403 } |
374 // if (--unfulfilled == (maxQueueSize / 2)) { |
404 |
375 // long req = maxQueueSize - unfulfilled; |
405 @Override |
376 // subscription.request(req); |
406 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
377 // System.out.println("EchoTube request: " + req); |
407 throw newInternalError(); |
378 // unfulfilled = maxQueueSize; |
408 } |
379 // } |
409 |
380 // System.out.println("EchoTube add " + Utils.remaining(item)); |
410 @Override |
381 // queue.add(item); |
411 public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) { |
382 // processingScheduler.deferOrSchedule(executor); |
412 tube.start(writePublisher, readSubscriber); |
383 // } |
413 } |
384 // |
414 |
385 // @Override |
415 @Override |
386 // public void onError(Throwable throwable) { |
416 public boolean isFinished() { |
387 // System.out.println("EchoTube add " + throwable); |
417 return tube.isFinished(); |
388 // queue.add(throwable); |
418 } |
389 // processingScheduler.deferOrSchedule(executor); |
419 |
390 // } |
420 Error newInternalError() { |
391 // |
421 InternalError error = new InternalError(); |
392 // @Override |
422 error.printStackTrace(System.out); |
393 // public void onComplete() { |
423 return error; |
394 // System.out.println("EchoTube add EOF"); |
424 } |
395 // queue.add(EOF); |
425 |
396 // processingScheduler.deferOrSchedule(executor); |
426 @Override |
397 // } |
427 public void onSubscribe(Flow.Subscription subscription) { |
398 // |
428 throw newInternalError(); |
399 // @Override |
429 } |
400 // public boolean isFinished() { |
430 |
401 // return false; |
431 @Override |
402 // } |
432 public void onError(Throwable throwable) { |
403 // |
433 throw newInternalError(); |
404 // private class InternalSubscription implements Flow.Subscription { |
434 } |
405 // |
435 |
406 // @Override |
436 @Override |
407 // public void request(long n) { |
437 public void onComplete() { |
408 // System.out.println("EchoTube got request: " + n); |
438 throw newInternalError(); |
409 // if (n <= 0) { |
439 } |
410 // throw new InternalError(); |
440 |
411 // } |
441 @Override |
412 // demand.increase(n); |
442 public void onNext(List<ByteBuffer> item) { |
413 // processingScheduler.runOrSchedule(); |
443 throw newInternalError(); |
414 // } |
444 } |
415 // |
445 } |
416 // @Override |
446 |
417 // public void cancel() { |
447 /** |
418 // cancelled.set(true); |
448 * A late binding tube that makes it possible to create an |
419 // } |
449 * SSLTube before the right-hand-side tube has been created. |
420 // } |
450 * The typical usage is to make it possible to connect two |
421 // |
451 * opposite SSLTube (one encrypting, one decrypting) through a |
422 // @Override |
452 * CrossOverTube: |
423 // public String toString() { |
453 * {@code |
424 // return "EchoTube"; |
454 * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
425 // } |
455 * } |
426 // |
456 * <p> |
427 // private SequentialScheduler.RestartableTask createProcessingTask() { |
457 * Note that this class only supports a single call to start(): it cannot be |
428 // return new SequentialScheduler.CompleteRestartableTask() { |
458 * subscribed more than once from its left-hand-side (the cross over tube side). |
429 // |
459 */ |
430 // @Override |
460 private static class LateBindingTube implements FlowTube { |
431 // protected void run() { |
461 |
432 // try { |
462 final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher |
433 // while (!cancelled.get()) { |
463 = new CompletableFuture<>(); |
434 // Object item = queue.peek(); |
464 final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue |
435 // if (item == null) |
465 = new ConcurrentLinkedQueue<>(); |
436 // return; |
466 AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>(); |
437 // try { |
467 SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop); |
438 // System.out.println("EchoTube processing item"); |
468 AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
439 // if (item instanceof List) { |
469 private volatile boolean finished; |
440 // if (!demand.tryDecrement()) { |
470 private volatile boolean completed; |
441 // System.out.println("EchoTube no demand"); |
471 |
442 // return; |
472 |
443 // } |
473 public void start(Flow.Publisher<List<ByteBuffer>> publisher, |
444 // @SuppressWarnings("unchecked") |
474 Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
445 // List<ByteBuffer> bytes = (List<ByteBuffer>) item; |
475 subscriberRef.set(subscriber); |
446 // Object removed = queue.remove(); |
476 futurePublisher.complete(publisher); |
447 // assert removed == item; |
477 scheduler.runOrSchedule(); |
448 // System.out.println("EchoTube processing " |
478 } |
449 // + Utils.remaining(bytes)); |
479 |
450 // subscriber.onNext(bytes); |
480 @Override |
451 // } else if (item instanceof Throwable) { |
481 public boolean isFinished() { |
452 // cancelled.set(true); |
482 return finished; |
453 // Object removed = queue.remove(); |
483 } |
454 // assert removed == item; |
484 |
455 // System.out.println("EchoTube processing " + item); |
485 @Override |
456 // subscriber.onError((Throwable) item); |
486 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
457 // } else if (item == EOF) { |
487 futurePublisher.thenAccept((p) -> p.subscribe(subscriber)); |
458 // cancelled.set(true); |
488 scheduler.runOrSchedule(); |
459 // Object removed = queue.remove(); |
489 } |
460 // assert removed == item; |
490 |
461 // System.out.println("EchoTube processing EOF"); |
491 @Override |
462 // subscriber.onComplete(); |
492 public void onSubscribe(Flow.Subscription subscription) { |
463 // } else { |
493 queue.add((s) -> s.onSubscribe(subscription)); |
464 // throw new InternalError(String.valueOf(item)); |
494 scheduler.runOrSchedule(); |
465 // } |
495 } |
466 // } finally { |
496 |
467 // } |
497 @Override |
468 // } |
498 public void onNext(List<ByteBuffer> item) { |
469 // } catch(Throwable t) { |
499 queue.add((s) -> s.onNext(item)); |
470 // t.printStackTrace(); |
500 scheduler.runOrSchedule(); |
471 // throw t; |
501 } |
472 // } |
502 |
473 // } |
503 @Override |
474 // }; |
504 public void onError(Throwable throwable) { |
475 // } |
505 System.out.println("LateBindingTube onError"); |
476 // } |
506 throwable.printStackTrace(System.out); |
|
507 queue.add((s) -> { |
|
508 errorRef.compareAndSet(null, throwable); |
|
509 try { |
|
510 System.out.println("LateBindingTube subscriber onError: " + throwable); |
|
511 s.onError(errorRef.get()); |
|
512 } finally { |
|
513 finished = true; |
|
514 System.out.println("LateBindingTube finished"); |
|
515 } |
|
516 }); |
|
517 scheduler.runOrSchedule(); |
|
518 } |
|
519 |
|
520 @Override |
|
521 public void onComplete() { |
|
522 System.out.println("LateBindingTube completing"); |
|
523 queue.add((s) -> { |
|
524 completed = true; |
|
525 try { |
|
526 System.out.println("LateBindingTube complete subscriber"); |
|
527 s.onComplete(); |
|
528 } finally { |
|
529 finished = true; |
|
530 System.out.println("LateBindingTube finished"); |
|
531 } |
|
532 }); |
|
533 scheduler.runOrSchedule(); |
|
534 } |
|
535 |
|
536 private void loop() { |
|
537 if (finished) { |
|
538 scheduler.stop(); |
|
539 return; |
|
540 } |
|
541 Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get(); |
|
542 if (subscriber == null) return; |
|
543 try { |
|
544 Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s; |
|
545 while ((s = queue.poll()) != null) { |
|
546 s.accept(subscriber); |
|
547 } |
|
548 } catch (Throwable t) { |
|
549 if (errorRef.compareAndSet(null, t)) { |
|
550 onError(t); |
|
551 } |
|
552 } |
|
553 } |
|
554 } |
|
555 |
|
556 /** |
|
557 * An echo tube that just echoes back whatever bytes it receives. |
|
558 * This cannot be plugged to the right-hand-side of an SSLTube |
|
559 * since handshake data cannot be simply echoed back, and |
|
560 * application data most likely also need to be decrypted and |
|
561 * re-encrypted. |
|
562 */ |
|
563 private static final class EchoTube implements FlowTube { |
|
564 |
|
565 private final static Object EOF = new Object(); |
|
566 private final Executor executor = Executors.newSingleThreadExecutor(); |
|
567 |
|
568 private final Queue<Object> queue = new ConcurrentLinkedQueue<>(); |
|
569 private final int maxQueueSize; |
|
570 private final SequentialScheduler processingScheduler = |
|
571 new SequentialScheduler(createProcessingTask()); |
|
572 |
|
573 /* Writing into this tube */ |
|
574 private volatile long requested; |
|
575 private Flow.Subscription subscription; |
|
576 |
|
577 /* Reading from this tube */ |
|
578 private final Demand demand = new Demand(); |
|
579 private final AtomicBoolean cancelled = new AtomicBoolean(); |
|
580 private Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
|
581 |
|
582 private EchoTube(int maxBufferSize) { |
|
583 if (maxBufferSize < 1) |
|
584 throw new IllegalArgumentException(); |
|
585 this.maxQueueSize = maxBufferSize; |
|
586 } |
|
587 |
|
588 @Override |
|
589 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
590 this.subscriber = subscriber; |
|
591 System.out.println("EchoTube got subscriber: " + subscriber); |
|
592 this.subscriber.onSubscribe(new InternalSubscription()); |
|
593 } |
|
594 |
|
595 @Override |
|
596 public void onSubscribe(Flow.Subscription subscription) { |
|
597 System.out.println("EchoTube request: " + maxQueueSize); |
|
598 (this.subscription = subscription).request(requested = maxQueueSize); |
|
599 } |
|
600 |
|
601 private void requestMore() { |
|
602 Flow.Subscription s = subscription; |
|
603 if (s == null || cancelled.get()) return; |
|
604 long unfulfilled = queue.size() + --requested; |
|
605 if (unfulfilled <= maxQueueSize/2) { |
|
606 long req = maxQueueSize - unfulfilled; |
|
607 requested += req; |
|
608 s.request(req); |
|
609 System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n", |
|
610 req, requested-req, queue.size(), unfulfilled ); |
|
611 } |
|
612 } |
|
613 |
|
614 @Override |
|
615 public void onNext(List<ByteBuffer> item) { |
|
616 System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", |
|
617 Utils.remaining(item), requested, queue.size()); |
|
618 queue.add(item); |
|
619 processingScheduler.deferOrSchedule(executor); |
|
620 } |
|
621 |
|
622 @Override |
|
623 public void onError(Throwable throwable) { |
|
624 System.out.println("EchoTube add " + throwable); |
|
625 queue.add(throwable); |
|
626 processingScheduler.deferOrSchedule(executor); |
|
627 } |
|
628 |
|
629 @Override |
|
630 public void onComplete() { |
|
631 System.out.println("EchoTube add EOF"); |
|
632 queue.add(EOF); |
|
633 processingScheduler.deferOrSchedule(executor); |
|
634 } |
|
635 |
|
636 @Override |
|
637 public boolean isFinished() { |
|
638 return cancelled.get(); |
|
639 } |
|
640 |
|
641 private class InternalSubscription implements Flow.Subscription { |
|
642 |
|
643 @Override |
|
644 public void request(long n) { |
|
645 System.out.println("EchoTube got request: " + n); |
|
646 if (n <= 0) { |
|
647 throw new InternalError(); |
|
648 } |
|
649 if (demand.increase(n)) { |
|
650 processingScheduler.deferOrSchedule(executor); |
|
651 } |
|
652 } |
|
653 |
|
654 @Override |
|
655 public void cancel() { |
|
656 cancelled.set(true); |
|
657 } |
|
658 } |
|
659 |
|
660 @Override |
|
661 public String toString() { |
|
662 return "EchoTube"; |
|
663 } |
|
664 |
|
665 int transmitted = 0; |
|
666 private SequentialScheduler.RestartableTask createProcessingTask() { |
|
667 return new SequentialScheduler.CompleteRestartableTask() { |
|
668 |
|
669 @Override |
|
670 protected void run() { |
|
671 try { |
|
672 while (!cancelled.get()) { |
|
673 Object item = queue.peek(); |
|
674 if (item == null) { |
|
675 System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n", |
|
676 requested, demand.get(), transmitted); |
|
677 requestMore(); |
|
678 return; |
|
679 } |
|
680 try { |
|
681 System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n", |
|
682 requested, demand.get(), transmitted); |
|
683 if (item instanceof List) { |
|
684 if (!demand.tryDecrement()) { |
|
685 System.out.println("EchoTube no demand"); |
|
686 return; |
|
687 } |
|
688 @SuppressWarnings("unchecked") |
|
689 List<ByteBuffer> bytes = (List<ByteBuffer>) item; |
|
690 Object removed = queue.remove(); |
|
691 assert removed == item; |
|
692 System.out.println("EchoTube processing " |
|
693 + Utils.remaining(bytes)); |
|
694 transmitted++; |
|
695 subscriber.onNext(bytes); |
|
696 requestMore(); |
|
697 } else if (item instanceof Throwable) { |
|
698 cancelled.set(true); |
|
699 Object removed = queue.remove(); |
|
700 assert removed == item; |
|
701 System.out.println("EchoTube processing " + item); |
|
702 subscriber.onError((Throwable) item); |
|
703 } else if (item == EOF) { |
|
704 cancelled.set(true); |
|
705 Object removed = queue.remove(); |
|
706 assert removed == item; |
|
707 System.out.println("EchoTube processing EOF"); |
|
708 subscriber.onComplete(); |
|
709 } else { |
|
710 throw new InternalError(String.valueOf(item)); |
|
711 } |
|
712 } finally { |
|
713 } |
|
714 } |
|
715 } catch(Throwable t) { |
|
716 t.printStackTrace(); |
|
717 throw t; |
|
718 } |
|
719 } |
|
720 }; |
|
721 } |
|
722 } |
477 |
723 |
478 /** |
724 /** |
479 * The final subscriber which receives the decrypted looped-back data. Just |
725 * The final subscriber which receives the decrypted looped-back data. Just |
480 * needs to compare the data with what was sent. The given CF is either |
726 * needs to compare the data with what was sent. The given CF is either |
481 * completed exceptionally with an error or normally on success. |
727 * completed exceptionally with an error or normally on success. |