test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
branchhttp-client-branch
changeset 55942 8d4770c22b63
parent 55909 583695a0ed6a
child 55947 c4f314605d28
equal deleted inserted replaced
55941:2d423c9b73bb 55942:8d4770c22b63
    53 import java.security.NoSuchAlgorithmException;
    53 import java.security.NoSuchAlgorithmException;
    54 import java.security.UnrecoverableKeyException;
    54 import java.security.UnrecoverableKeyException;
    55 import java.security.cert.CertificateException;
    55 import java.security.cert.CertificateException;
    56 import java.util.List;
    56 import java.util.List;
    57 import java.util.Queue;
    57 import java.util.Queue;
       
    58 import java.util.Random;
    58 import java.util.StringTokenizer;
    59 import java.util.StringTokenizer;
    59 import java.util.concurrent.BlockingQueue;
    60 import java.util.concurrent.BlockingQueue;
    60 import java.util.concurrent.CompletableFuture;
    61 import java.util.concurrent.CompletableFuture;
    61 import java.util.concurrent.ConcurrentLinkedQueue;
    62 import java.util.concurrent.ConcurrentLinkedQueue;
    62 import java.util.concurrent.Executor;
    63 import java.util.concurrent.Executor;
    67 import java.util.concurrent.LinkedBlockingQueue;
    68 import java.util.concurrent.LinkedBlockingQueue;
    68 import java.util.concurrent.SubmissionPublisher;
    69 import java.util.concurrent.SubmissionPublisher;
    69 import java.util.concurrent.atomic.AtomicBoolean;
    70 import java.util.concurrent.atomic.AtomicBoolean;
    70 import java.util.concurrent.atomic.AtomicInteger;
    71 import java.util.concurrent.atomic.AtomicInteger;
    71 import java.util.concurrent.atomic.AtomicLong;
    72 import java.util.concurrent.atomic.AtomicLong;
       
    73 import java.util.concurrent.atomic.AtomicReference;
       
    74 import java.util.function.Consumer;
    72 
    75 
    73 @Test
    76 @Test
    74 public class SSLTubeTest {
    77 public class SSLTubeTest {
    75 
    78 
    76     private static final long COUNTER = 600;
    79     private static final long COUNTER = 600;
    77     private static final int LONGS_PER_BUF = 800;
    80     private static final int LONGS_PER_BUF = 800;
    78     private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
    81     private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
       
    82     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
       
    83 
       
    84     static final Random rand = new Random();
       
    85 
       
    86     static int randomRange(int lower, int upper) {
       
    87         if (lower > upper)
       
    88             throw new IllegalArgumentException("lower > upper");
       
    89         int diff = upper - lower;
       
    90         int r = lower + rand.nextInt(diff);
       
    91         return r - (r % 8); // round down to multiple of 8 (align for longs)
       
    92     }
    79 
    93 
    80     private static ByteBuffer getBuffer(long startingAt) {
    94     private static ByteBuffer getBuffer(long startingAt) {
    81         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
    95         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
    82         for (int j = 0; j < LONGS_PER_BUF; j++) {
    96         for (int j = 0; j < LONGS_PER_BUF; j++) {
    83             buf.putLong(startingAt++);
    97             buf.putLong(startingAt++);
    84         }
    98         }
    85         buf.flip();
    99         buf.flip();
    86         return buf;
   100         return buf;
    87     }
   101     }
    88 
   102 
    89     @Test(timeOut = 30000)
   103     @Test
    90     public void run() throws IOException {
   104     public void runWithSSLLoopackServer() throws IOException {
       
   105         ExecutorService sslExecutor = Executors.newCachedThreadPool();
       
   106 
    91         /* Start of wiring */
   107         /* Start of wiring */
    92         ExecutorService sslExecutor = Executors.newCachedThreadPool();
       
    93         /* Emulates an echo server */
   108         /* Emulates an echo server */
    94 //        FlowTube server = new SSLTube(createSSLEngine(false),
       
    95 //                                      sslExecutor,
       
    96 //                                      new EchoTube(16));
       
    97         SSLLoopbackSubscriber server =
   109         SSLLoopbackSubscriber server =
    98                 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
   110                 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
    99         server.start();
   111         server.start();
   100 
   112 
       
   113         run(server, sslExecutor);
       
   114     }
       
   115 
       
   116     @Test
       
   117     public void runWithEchoServer() throws IOException {
       
   118         ExecutorService sslExecutor = Executors.newCachedThreadPool();
       
   119 
       
   120         /* Start of wiring */
       
   121         /* Emulates an echo server */
       
   122         FlowTube server = crossOverEchoServer(sslExecutor);
       
   123 
       
   124         run(server, sslExecutor);
       
   125     }
       
   126 
       
   127     private void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
   101         FlowTube client = new SSLTube(createSSLEngine(true),
   128         FlowTube client = new SSLTube(createSSLEngine(true),
   102                                       sslExecutor,
   129                                       sslExecutor,
   103                                       server);
   130                                       server);
   104         SubmissionPublisher<List<ByteBuffer>> p =
   131         SubmissionPublisher<List<ByteBuffer>> p =
   105                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
   132                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
   126         } finally {
   153         } finally {
   127             sslExecutor.shutdownNow();
   154             sslExecutor.shutdownNow();
   128         }
   155         }
   129     }
   156     }
   130 
   157 
       
   158     /**
       
   159      * This is a copy of the SSLLoopbackSubscriber used in FlowTest
       
   160      */
   131     static class SSLLoopbackSubscriber implements FlowTube {
   161     static class SSLLoopbackSubscriber implements FlowTube {
   132         private final BlockingQueue<ByteBuffer> buffer;
   162         private final BlockingQueue<ByteBuffer> buffer;
   133         private final Socket clientSock;
   163         private final Socket clientSock;
   134         private final SSLSocket serverSock;
   164         private final SSLSocket serverSock;
   135         private final Thread thread1, thread2, thread3;
   165         private final Thread thread1, thread2, thread3;
   171 
   201 
   172         // reads off the SSLSocket the data from the "server"
   202         // reads off the SSLSocket the data from the "server"
   173         private void clientReader() {
   203         private void clientReader() {
   174             try {
   204             try {
   175                 InputStream is = clientSock.getInputStream();
   205                 InputStream is = clientSock.getInputStream();
   176                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
   206                 final int bufsize = randomRange(512, 16 * 1024);
   177                 System.out.println("clientReader: bufsize = " + bufsize);
   207                 System.out.println("clientReader: bufsize = " + bufsize);
   178                 while (true) {
   208                 while (true) {
   179                     byte[] buf = new byte[bufsize];
   209                     byte[] buf = new byte[bufsize];
   180                     int n = is.read(buf);
   210                     int n = is.read(buf);
   181                     if (n == -1) {
   211                     if (n == -1) {
   204                 OutputStream os =
   234                 OutputStream os =
   205                         new BufferedOutputStream(clientSock.getOutputStream());
   235                         new BufferedOutputStream(clientSock.getOutputStream());
   206 
   236 
   207                 while (true) {
   237                 while (true) {
   208                     ByteBuffer buf = buffer.take();
   238                     ByteBuffer buf = buffer.take();
   209                     if (buf == FlowTest.SENTINEL) {
   239                     if (buf == SENTINEL) {
   210                         // finished
   240                         // finished
   211                         //Utils.sleep(2000);
   241                         //Utils.sleep(2000);
   212                         System.out.println("clientWriter close: " + nbytes + " written");
   242                         System.out.println("clientWriter close: " + nbytes + " written");
   213                         clientSock.shutdownOutput();
   243                         clientSock.shutdownOutput();
   214                         System.out.println("clientWriter close return");
   244                         System.out.println("clientWriter close return");
   247         // thread2
   277         // thread2
   248         private void serverLoopback() {
   278         private void serverLoopback() {
   249             try {
   279             try {
   250                 InputStream is = serverSock.getInputStream();
   280                 InputStream is = serverSock.getInputStream();
   251                 OutputStream os = serverSock.getOutputStream();
   281                 OutputStream os = serverSock.getOutputStream();
   252                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
   282                 final int bufsize = randomRange(512, 16 * 1024);
   253                 System.out.println("serverLoopback: bufsize = " + bufsize);
   283                 System.out.println("serverLoopback: bufsize = " + bufsize);
   254                 byte[] bb = new byte[bufsize];
   284                 byte[] bb = new byte[bufsize];
   255                 while (true) {
   285                 while (true) {
   256                     int n = is.read(bb);
   286                     int n = is.read(bb);
   257                     if (n == -1) {
   287                     if (n == -1) {
   303         }
   333         }
   304 
   334 
   305         @Override
   335         @Override
   306         public void onComplete() {
   336         public void onComplete() {
   307             try {
   337             try {
   308                 buffer.put(FlowTest.SENTINEL);
   338                 buffer.put(SENTINEL);
   309             } catch (InterruptedException e) {
   339             } catch (InterruptedException e) {
   310                 e.printStackTrace();
   340                 e.printStackTrace();
   311                 Utils.close(clientSock);
   341                 Utils.close(clientSock);
   312             }
   342             }
   313         }
   343         }
   328             Thread.sleep(millis);
   358             Thread.sleep(millis);
   329         } catch (InterruptedException e) {
   359         } catch (InterruptedException e) {
   330 
   360 
   331         }
   361         }
   332     }
   362     }
   333 //    private static final class EchoTube implements FlowTube {
   363 
   334 //
   364     /**
   335 //        private final static Object EOF = new Object();
   365      * Creates a cross-over FlowTube than can be plugged into a client-side
   336 //        private final Executor executor = Executors.newSingleThreadExecutor();
   366      * SSLTube (in place of the SSLLoopbackSubscriber).
   337 //
   367      * Note that the only method that can be called on the return tube
   338 //        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
   368      * is connectFlows(). Calling any other method will trigger an
   339 //        private final int maxQueueSize;
   369      * InternalError.
   340 //        private final SequentialScheduler processingScheduler =
   370      * @param sslExecutor an executor
   341 //                new SequentialScheduler(createProcessingTask());
   371      * @return a cross-over FlowTube connected to an EchoTube.
   342 //
   372      * @throws IOException
   343 //        /* Writing into this tube */
   373      */
   344 //        private long unfulfilled;
   374     FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException {
   345 //        private Flow.Subscription subscription;
   375         LateBindingTube crossOver = new LateBindingTube();
   346 //
   376         FlowTube server = new SSLTube(createSSLEngine(false),
   347 //        /* Reading from this tube */
   377                                       sslExecutor,
   348 //        private final Demand demand = new Demand();
   378                                       crossOver);
   349 //        private final AtomicBoolean cancelled = new AtomicBoolean();
   379         EchoTube echo = new EchoTube(6);
   350 //        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
   380         server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo));
   351 //
   381 
   352 //        private EchoTube(int maxBufferSize) {
   382         return new CrossOverTube(crossOver);
   353 //            if (maxBufferSize < 1)
   383     }
   354 //                throw new IllegalArgumentException();
   384 
   355 //            this.maxQueueSize = maxBufferSize;
   385     /**
   356 //        }
   386      * A cross-over FlowTube that makes it possible to reverse the direction
   357 //
   387      * of flows. The typical usage is to connect an two opposite SSLTube,
   358 //        @Override
   388      * one encrypting, one decrypting, to e.g. an EchoTube, with the help
   359 //        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   389      * of a LateBindingTube:
   360 //            this.subscriber = subscriber;
   390      * {@code
   361 //            System.out.println("EchoTube got subscriber: " + subscriber);
   391      * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
   362 //            this.subscriber.onSubscribe(new InternalSubscription());
   392      * }
   363 //        }
   393      * <p>
   364 //
   394      * Note that the only method that can be called on the CrossOverTube is
   365 //        @Override
   395      * connectFlows(). Calling any other method will cause an InternalError to
   366 //        public void onSubscribe(Flow.Subscription subscription) {
   396      * be thrown.
   367 //            unfulfilled = maxQueueSize;
   397      * Also connectFlows() can be called only once.
   368 //            System.out.println("EchoTube request: " + maxQueueSize);
   398      */
   369 //            (this.subscription = subscription).request(maxQueueSize);
   399     private static final class CrossOverTube implements FlowTube {
   370 //        }
   400         final LateBindingTube tube;
   371 //
   401         CrossOverTube(LateBindingTube tube) {
   372 //        @Override
   402             this.tube = tube;
   373 //        public void onNext(List<ByteBuffer> item) {
   403         }
   374 //            if (--unfulfilled == (maxQueueSize / 2)) {
   404 
   375 //                long req = maxQueueSize - unfulfilled;
   405         @Override
   376 //                subscription.request(req);
   406         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   377 //                System.out.println("EchoTube request: " + req);
   407             throw newInternalError();
   378 //                unfulfilled = maxQueueSize;
   408         }
   379 //            }
   409 
   380 //            System.out.println("EchoTube add " + Utils.remaining(item));
   410         @Override
   381 //            queue.add(item);
   411         public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) {
   382 //            processingScheduler.deferOrSchedule(executor);
   412             tube.start(writePublisher, readSubscriber);
   383 //        }
   413         }
   384 //
   414 
   385 //        @Override
   415         @Override
   386 //        public void onError(Throwable throwable) {
   416         public boolean isFinished() {
   387 //            System.out.println("EchoTube add " + throwable);
   417             return tube.isFinished();
   388 //            queue.add(throwable);
   418         }
   389 //            processingScheduler.deferOrSchedule(executor);
   419 
   390 //        }
   420         Error newInternalError() {
   391 //
   421             InternalError error = new InternalError();
   392 //        @Override
   422             error.printStackTrace(System.out);
   393 //        public void onComplete() {
   423             return error;
   394 //            System.out.println("EchoTube add EOF");
   424         }
   395 //            queue.add(EOF);
   425 
   396 //            processingScheduler.deferOrSchedule(executor);
   426         @Override
   397 //        }
   427         public void onSubscribe(Flow.Subscription subscription) {
   398 //
   428             throw newInternalError();
   399 //        @Override
   429         }
   400 //        public boolean isFinished() {
   430 
   401 //            return false;
   431         @Override
   402 //        }
   432         public void onError(Throwable throwable) {
   403 //
   433             throw newInternalError();
   404 //        private class InternalSubscription implements Flow.Subscription {
   434         }
   405 //
   435 
   406 //            @Override
   436         @Override
   407 //            public void request(long n) {
   437         public void onComplete() {
   408 //                System.out.println("EchoTube got request: " + n);
   438             throw newInternalError();
   409 //                if (n <= 0) {
   439         }
   410 //                    throw new InternalError();
   440 
   411 //                }
   441         @Override
   412 //                demand.increase(n);
   442         public void onNext(List<ByteBuffer> item) {
   413 //                processingScheduler.runOrSchedule();
   443             throw newInternalError();
   414 //            }
   444         }
   415 //
   445     }
   416 //            @Override
   446 
   417 //            public void cancel() {
   447     /**
   418 //                cancelled.set(true);
   448      * A late binding tube that makes it possible to create an
   419 //            }
   449      * SSLTube before the right-hand-side tube has been created.
   420 //        }
   450      * The typical usage is to make it possible to connect two
   421 //
   451      * opposite SSLTube (one encrypting, one decrypting) through a
   422 //        @Override
   452      * CrossOverTube:
   423 //        public String toString() {
   453      * {@code
   424 //            return "EchoTube";
   454      * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
   425 //        }
   455      * }
   426 //
   456      * <p>
   427 //        private SequentialScheduler.RestartableTask createProcessingTask() {
   457      * Note that this class only supports a single call to start(): it cannot be
   428 //            return new SequentialScheduler.CompleteRestartableTask() {
   458      * subscribed more than once from its left-hand-side (the cross over tube side).
   429 //
   459      */
   430 //                @Override
   460     private static class LateBindingTube implements FlowTube {
   431 //                protected void run() {
   461 
   432 //                    try {
   462         final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher
   433 //                        while (!cancelled.get()) {
   463                 = new CompletableFuture<>();
   434 //                            Object item = queue.peek();
   464         final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
   435 //                            if (item == null)
   465                 = new ConcurrentLinkedQueue<>();
   436 //                                return;
   466         AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
   437 //                            try {
   467         SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
   438 //                                System.out.println("EchoTube processing item");
   468         AtomicReference<Throwable> errorRef = new AtomicReference<>();
   439 //                                if (item instanceof List) {
   469         private volatile boolean finished;
   440 //                                    if (!demand.tryDecrement()) {
   470         private volatile boolean completed;
   441 //                                        System.out.println("EchoTube no demand");
   471 
   442 //                                        return;
   472 
   443 //                                    }
   473         public void start(Flow.Publisher<List<ByteBuffer>> publisher,
   444 //                                    @SuppressWarnings("unchecked")
   474                           Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   445 //                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
   475             subscriberRef.set(subscriber);
   446 //                                    Object removed = queue.remove();
   476             futurePublisher.complete(publisher);
   447 //                                    assert removed == item;
   477             scheduler.runOrSchedule();
   448 //                                    System.out.println("EchoTube processing "
   478         }
   449 //                                            + Utils.remaining(bytes));
   479 
   450 //                                    subscriber.onNext(bytes);
   480         @Override
   451 //                                } else if (item instanceof Throwable) {
   481         public boolean isFinished() {
   452 //                                    cancelled.set(true);
   482             return finished;
   453 //                                    Object removed = queue.remove();
   483         }
   454 //                                    assert removed == item;
   484 
   455 //                                    System.out.println("EchoTube processing " + item);
   485         @Override
   456 //                                    subscriber.onError((Throwable) item);
   486         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   457 //                                } else if (item == EOF) {
   487             futurePublisher.thenAccept((p) -> p.subscribe(subscriber));
   458 //                                    cancelled.set(true);
   488             scheduler.runOrSchedule();
   459 //                                    Object removed = queue.remove();
   489         }
   460 //                                    assert removed == item;
   490 
   461 //                                    System.out.println("EchoTube processing EOF");
   491         @Override
   462 //                                    subscriber.onComplete();
   492         public void onSubscribe(Flow.Subscription subscription) {
   463 //                                } else {
   493             queue.add((s) -> s.onSubscribe(subscription));
   464 //                                    throw new InternalError(String.valueOf(item));
   494             scheduler.runOrSchedule();
   465 //                                }
   495         }
   466 //                            } finally {
   496 
   467 //                            }
   497         @Override
   468 //                        }
   498         public void onNext(List<ByteBuffer> item) {
   469 //                    } catch(Throwable t) {
   499             queue.add((s) -> s.onNext(item));
   470 //                        t.printStackTrace();
   500             scheduler.runOrSchedule();
   471 //                        throw t;
   501         }
   472 //                    }
   502 
   473 //                }
   503         @Override
   474 //            };
   504         public void onError(Throwable throwable) {
   475 //        }
   505             System.out.println("LateBindingTube onError");
   476 //    }
   506             throwable.printStackTrace(System.out);
       
   507             queue.add((s) -> {
       
   508                 errorRef.compareAndSet(null, throwable);
       
   509                 try {
       
   510                     System.out.println("LateBindingTube subscriber onError: " + throwable);
       
   511                     s.onError(errorRef.get());
       
   512                 } finally {
       
   513                     finished = true;
       
   514                     System.out.println("LateBindingTube finished");
       
   515                 }
       
   516             });
       
   517             scheduler.runOrSchedule();
       
   518         }
       
   519 
       
   520         @Override
       
   521         public void onComplete() {
       
   522             System.out.println("LateBindingTube completing");
       
   523             queue.add((s) -> {
       
   524                 completed = true;
       
   525                 try {
       
   526                     System.out.println("LateBindingTube complete subscriber");
       
   527                     s.onComplete();
       
   528                 } finally {
       
   529                     finished = true;
       
   530                     System.out.println("LateBindingTube finished");
       
   531                 }
       
   532             });
       
   533             scheduler.runOrSchedule();
       
   534         }
       
   535 
       
   536         private void loop() {
       
   537             if (finished) {
       
   538                 scheduler.stop();
       
   539                 return;
       
   540             }
       
   541             Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
       
   542             if (subscriber == null) return;
       
   543             try {
       
   544                 Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s;
       
   545                 while ((s = queue.poll()) != null) {
       
   546                     s.accept(subscriber);
       
   547                 }
       
   548             } catch (Throwable t) {
       
   549                 if (errorRef.compareAndSet(null, t)) {
       
   550                     onError(t);
       
   551                 }
       
   552             }
       
   553         }
       
   554     }
       
   555 
       
   556     /**
       
   557      * An echo tube that just echoes back whatever bytes it receives.
       
   558      * This cannot be plugged to the right-hand-side of an SSLTube
       
   559      * since handshake data cannot be simply echoed back, and
       
   560      * application data most likely also need to be decrypted and
       
   561      * re-encrypted.
       
   562      */
       
   563     private static final class EchoTube implements FlowTube {
       
   564 
       
   565         private final static Object EOF = new Object();
       
   566         private final Executor executor = Executors.newSingleThreadExecutor();
       
   567 
       
   568         private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
       
   569         private final int maxQueueSize;
       
   570         private final SequentialScheduler processingScheduler =
       
   571                 new SequentialScheduler(createProcessingTask());
       
   572 
       
   573         /* Writing into this tube */
       
   574         private volatile long requested;
       
   575         private Flow.Subscription subscription;
       
   576 
       
   577         /* Reading from this tube */
       
   578         private final Demand demand = new Demand();
       
   579         private final AtomicBoolean cancelled = new AtomicBoolean();
       
   580         private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   581 
       
   582         private EchoTube(int maxBufferSize) {
       
   583             if (maxBufferSize < 1)
       
   584                 throw new IllegalArgumentException();
       
   585             this.maxQueueSize = maxBufferSize;
       
   586         }
       
   587 
       
   588         @Override
       
   589         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   590             this.subscriber = subscriber;
       
   591             System.out.println("EchoTube got subscriber: " + subscriber);
       
   592             this.subscriber.onSubscribe(new InternalSubscription());
       
   593         }
       
   594 
       
   595         @Override
       
   596         public void onSubscribe(Flow.Subscription subscription) {
       
   597             System.out.println("EchoTube request: " + maxQueueSize);
       
   598             (this.subscription = subscription).request(requested = maxQueueSize);
       
   599         }
       
   600 
       
   601         private void requestMore() {
       
   602             Flow.Subscription s = subscription;
       
   603             if (s == null || cancelled.get()) return;
       
   604             long unfulfilled = queue.size() + --requested;
       
   605             if (unfulfilled <= maxQueueSize/2) {
       
   606                 long req = maxQueueSize - unfulfilled;
       
   607                 requested += req;
       
   608                 s.request(req);
       
   609                 System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n",
       
   610                         req, requested-req, queue.size(), unfulfilled );
       
   611             }
       
   612         }
       
   613 
       
   614         @Override
       
   615         public void onNext(List<ByteBuffer> item) {
       
   616             System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n",
       
   617                     Utils.remaining(item), requested, queue.size());
       
   618             queue.add(item);
       
   619             processingScheduler.deferOrSchedule(executor);
       
   620         }
       
   621 
       
   622         @Override
       
   623         public void onError(Throwable throwable) {
       
   624             System.out.println("EchoTube add " + throwable);
       
   625             queue.add(throwable);
       
   626             processingScheduler.deferOrSchedule(executor);
       
   627         }
       
   628 
       
   629         @Override
       
   630         public void onComplete() {
       
   631             System.out.println("EchoTube add EOF");
       
   632             queue.add(EOF);
       
   633             processingScheduler.deferOrSchedule(executor);
       
   634         }
       
   635 
       
   636         @Override
       
   637         public boolean isFinished() {
       
   638             return cancelled.get();
       
   639         }
       
   640 
       
   641         private class InternalSubscription implements Flow.Subscription {
       
   642 
       
   643             @Override
       
   644             public void request(long n) {
       
   645                 System.out.println("EchoTube got request: " + n);
       
   646                 if (n <= 0) {
       
   647                     throw new InternalError();
       
   648                 }
       
   649                 if (demand.increase(n)) {
       
   650                     processingScheduler.deferOrSchedule(executor);
       
   651                 }
       
   652             }
       
   653 
       
   654             @Override
       
   655             public void cancel() {
       
   656                 cancelled.set(true);
       
   657             }
       
   658         }
       
   659 
       
   660         @Override
       
   661         public String toString() {
       
   662             return "EchoTube";
       
   663         }
       
   664 
       
   665         int transmitted = 0;
       
   666         private SequentialScheduler.RestartableTask createProcessingTask() {
       
   667             return new SequentialScheduler.CompleteRestartableTask() {
       
   668 
       
   669                 @Override
       
   670                 protected void run() {
       
   671                     try {
       
   672                         while (!cancelled.get()) {
       
   673                             Object item = queue.peek();
       
   674                             if (item == null) {
       
   675                                 System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n",
       
   676                                         requested, demand.get(), transmitted);
       
   677                                 requestMore();
       
   678                                 return;
       
   679                             }
       
   680                             try {
       
   681                                 System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n",
       
   682                                         requested, demand.get(), transmitted);
       
   683                                 if (item instanceof List) {
       
   684                                     if (!demand.tryDecrement()) {
       
   685                                         System.out.println("EchoTube no demand");
       
   686                                         return;
       
   687                                     }
       
   688                                     @SuppressWarnings("unchecked")
       
   689                                     List<ByteBuffer> bytes = (List<ByteBuffer>) item;
       
   690                                     Object removed = queue.remove();
       
   691                                     assert removed == item;
       
   692                                     System.out.println("EchoTube processing "
       
   693                                             + Utils.remaining(bytes));
       
   694                                     transmitted++;
       
   695                                     subscriber.onNext(bytes);
       
   696                                     requestMore();
       
   697                                 } else if (item instanceof Throwable) {
       
   698                                     cancelled.set(true);
       
   699                                     Object removed = queue.remove();
       
   700                                     assert removed == item;
       
   701                                     System.out.println("EchoTube processing " + item);
       
   702                                     subscriber.onError((Throwable) item);
       
   703                                 } else if (item == EOF) {
       
   704                                     cancelled.set(true);
       
   705                                     Object removed = queue.remove();
       
   706                                     assert removed == item;
       
   707                                     System.out.println("EchoTube processing EOF");
       
   708                                     subscriber.onComplete();
       
   709                                 } else {
       
   710                                     throw new InternalError(String.valueOf(item));
       
   711                                 }
       
   712                             } finally {
       
   713                             }
       
   714                         }
       
   715                     } catch(Throwable t) {
       
   716                         t.printStackTrace();
       
   717                         throw t;
       
   718                     }
       
   719                 }
       
   720             };
       
   721         }
       
   722     }
   477 
   723 
   478     /**
   724     /**
   479      * The final subscriber which receives the decrypted looped-back data. Just
   725      * The final subscriber which receives the decrypted looped-back data. Just
   480      * needs to compare the data with what was sent. The given CF is either
   726      * needs to compare the data with what was sent. The given CF is either
   481      * completed exceptionally with an error or normally on success.
   727      * completed exceptionally with an error or normally on success.
   516         @Override
   762         @Override
   517         public void onNext(List<ByteBuffer> buffers) {
   763         public void onNext(List<ByteBuffer> buffers) {
   518             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
   764             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
   519                 long req = REQUEST_WINDOW - unfulfilled;
   765                 long req = REQUEST_WINDOW - unfulfilled;
   520                 System.out.println("EndSubscriber request " + req);
   766                 System.out.println("EndSubscriber request " + req);
       
   767                 unfulfilled = REQUEST_WINDOW;
   521                 subscription.request(req);
   768                 subscription.request(req);
   522                 unfulfilled = REQUEST_WINDOW;
       
   523             }
   769             }
   524 
   770 
   525             long currval = counter.get();
   771             long currval = counter.get();
   526             if (currval % 500 == 0) {
   772             if (currval % 500 == 0) {
   527                 System.out.println("End: " + currval);
   773                 System.out.println("EndSubscriber: " + currval);
   528             }
   774             }
   529             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
   775             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
   530 
   776 
   531             for (ByteBuffer buf : buffers) {
   777             for (ByteBuffer buf : buffers) {
   532                 while (buf.hasRemaining()) {
   778                 while (buf.hasRemaining()) {