test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
branchhttp-client-branch
changeset 55909 583695a0ed6a
parent 55763 634d8e14c172
child 55942 8d4770c22b63
equal deleted inserted replaced
55908:a36a236e55d8 55909:583695a0ed6a
    23 
    23 
    24 package jdk.incubator.http;
    24 package jdk.incubator.http;
    25 
    25 
    26 import jdk.incubator.http.internal.common.Demand;
    26 import jdk.incubator.http.internal.common.Demand;
    27 import jdk.incubator.http.internal.common.FlowTube;
    27 import jdk.incubator.http.internal.common.FlowTube;
       
    28 import jdk.incubator.http.internal.common.SSLFlowDelegate;
    28 import jdk.incubator.http.internal.common.SSLTube;
    29 import jdk.incubator.http.internal.common.SSLTube;
    29 import jdk.incubator.http.internal.common.SequentialScheduler;
    30 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    31 import jdk.incubator.http.internal.common.Utils;
    30 import org.testng.annotations.Test;
    32 import org.testng.annotations.Test;
    31 
    33 
    32 import javax.net.ssl.KeyManagerFactory;
    34 import javax.net.ssl.KeyManagerFactory;
    33 import javax.net.ssl.SSLContext;
    35 import javax.net.ssl.SSLContext;
    34 import javax.net.ssl.SSLEngine;
    36 import javax.net.ssl.SSLEngine;
    35 import javax.net.ssl.SSLParameters;
    37 import javax.net.ssl.SSLParameters;
       
    38 import javax.net.ssl.SSLServerSocket;
       
    39 import javax.net.ssl.SSLServerSocketFactory;
       
    40 import javax.net.ssl.SSLSocket;
    36 import javax.net.ssl.TrustManagerFactory;
    41 import javax.net.ssl.TrustManagerFactory;
       
    42 import java.io.BufferedOutputStream;
    37 import java.io.File;
    43 import java.io.File;
    38 import java.io.FileInputStream;
    44 import java.io.FileInputStream;
    39 import java.io.IOException;
    45 import java.io.IOException;
    40 import java.io.InputStream;
    46 import java.io.InputStream;
       
    47 import java.io.OutputStream;
       
    48 import java.net.Socket;
    41 import java.nio.ByteBuffer;
    49 import java.nio.ByteBuffer;
    42 import java.security.KeyManagementException;
    50 import java.security.KeyManagementException;
    43 import java.security.KeyStore;
    51 import java.security.KeyStore;
    44 import java.security.KeyStoreException;
    52 import java.security.KeyStoreException;
    45 import java.security.NoSuchAlgorithmException;
    53 import java.security.NoSuchAlgorithmException;
    46 import java.security.UnrecoverableKeyException;
    54 import java.security.UnrecoverableKeyException;
    47 import java.security.cert.CertificateException;
    55 import java.security.cert.CertificateException;
    48 import java.util.List;
    56 import java.util.List;
    49 import java.util.Queue;
    57 import java.util.Queue;
    50 import java.util.StringTokenizer;
    58 import java.util.StringTokenizer;
       
    59 import java.util.concurrent.BlockingQueue;
    51 import java.util.concurrent.CompletableFuture;
    60 import java.util.concurrent.CompletableFuture;
    52 import java.util.concurrent.ConcurrentLinkedQueue;
    61 import java.util.concurrent.ConcurrentLinkedQueue;
    53 import java.util.concurrent.Executor;
    62 import java.util.concurrent.Executor;
    54 import java.util.concurrent.ExecutorService;
    63 import java.util.concurrent.ExecutorService;
    55 import java.util.concurrent.Executors;
    64 import java.util.concurrent.Executors;
    56 import java.util.concurrent.Flow;
    65 import java.util.concurrent.Flow;
    57 import java.util.concurrent.ForkJoinPool;
    66 import java.util.concurrent.ForkJoinPool;
       
    67 import java.util.concurrent.LinkedBlockingQueue;
    58 import java.util.concurrent.SubmissionPublisher;
    68 import java.util.concurrent.SubmissionPublisher;
    59 import java.util.concurrent.atomic.AtomicBoolean;
    69 import java.util.concurrent.atomic.AtomicBoolean;
       
    70 import java.util.concurrent.atomic.AtomicInteger;
    60 import java.util.concurrent.atomic.AtomicLong;
    71 import java.util.concurrent.atomic.AtomicLong;
    61 
    72 
    62 @Test
    73 @Test
    63 public class SSLTubeTest {
    74 public class SSLTubeTest {
    64 
    75 
    78     @Test(timeOut = 30000)
    89     @Test(timeOut = 30000)
    79     public void run() throws IOException {
    90     public void run() throws IOException {
    80         /* Start of wiring */
    91         /* Start of wiring */
    81         ExecutorService sslExecutor = Executors.newCachedThreadPool();
    92         ExecutorService sslExecutor = Executors.newCachedThreadPool();
    82         /* Emulates an echo server */
    93         /* Emulates an echo server */
    83         FlowTube server = new SSLTube(createSSLEngine(false),
    94 //        FlowTube server = new SSLTube(createSSLEngine(false),
    84                                       sslExecutor,
    95 //                                      sslExecutor,
    85                                       new EchoTube(16));
    96 //                                      new EchoTube(16));
       
    97         SSLLoopbackSubscriber server =
       
    98                 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
       
    99         server.start();
       
   100 
    86         FlowTube client = new SSLTube(createSSLEngine(true),
   101         FlowTube client = new SSLTube(createSSLEngine(true),
    87                                       sslExecutor,
   102                                       sslExecutor,
    88                                       server);
   103                                       server);
    89         SubmissionPublisher<List<ByteBuffer>> p =
   104         SubmissionPublisher<List<ByteBuffer>> p =
    90                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
   105                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
   111         } finally {
   126         } finally {
   112             sslExecutor.shutdownNow();
   127             sslExecutor.shutdownNow();
   113         }
   128         }
   114     }
   129     }
   115 
   130 
   116     private static final class EchoTube implements FlowTube {
   131     static class SSLLoopbackSubscriber implements FlowTube {
   117 
   132         private final BlockingQueue<ByteBuffer> buffer;
   118         private final static Object EOF = new Object();
   133         private final Socket clientSock;
   119         private final Executor executor = Executors.newSingleThreadExecutor();
   134         private final SSLSocket serverSock;
   120 
   135         private final Thread thread1, thread2, thread3;
   121         private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
   136         private volatile Flow.Subscription clientSubscription;
   122         private final int maxQueueSize;
   137         private final SubmissionPublisher<List<ByteBuffer>> publisher;
   123         private final SequentialScheduler processingScheduler =
   138 
   124                 new SequentialScheduler(createProcessingTask());
   139         SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
   125 
   140             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
   126         /* Writing into this tube */
   141             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
   127         private long unfulfilled;
   142             SSLParameters params = serv.getSSLParameters();
   128         private Flow.Subscription subscription;
   143             params.setApplicationProtocols(new String[]{"proto2"});
   129 
   144             serv.setSSLParameters(params);
   130         /* Reading from this tube */
   145 
   131         private final Demand demand = new Demand();
   146 
   132         private final AtomicBoolean cancelled = new AtomicBoolean();
   147             int serverPort = serv.getLocalPort();
   133         private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
   148             clientSock = new Socket("127.0.0.1", serverPort);
   134 
   149             serverSock = (SSLSocket) serv.accept();
   135         private EchoTube(int maxBufferSize) {
   150             this.buffer = new LinkedBlockingQueue<>();
   136             if (maxBufferSize < 1)
   151             thread1 = new Thread(this::clientWriter, "clientWriter");
   137                 throw new IllegalArgumentException();
   152             thread2 = new Thread(this::serverLoopback, "serverLoopback");
   138             this.maxQueueSize = maxBufferSize;
   153             thread3 = new Thread(this::clientReader, "clientReader");
   139         }
   154             publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
   140 
   155                     this::handlePublisherException);
   141         @Override
   156             SSLFlowDelegate.Monitor.add(this::monitor);
   142         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   157         }
   143             this.subscriber = subscriber;
   158 
   144             this.subscriber.onSubscribe(new InternalSubscription());
   159         public void start() {
       
   160             thread1.start();
       
   161             thread2.start();
       
   162             thread3.start();
       
   163         }
       
   164 
       
   165         private void handlePublisherException(Object o, Throwable t) {
       
   166             System.out.println("Loopback Publisher exception");
       
   167             t.printStackTrace(System.out);
       
   168         }
       
   169 
       
   170         private final AtomicInteger readCount = new AtomicInteger();
       
   171 
       
   172         // reads off the SSLSocket the data from the "server"
       
   173         private void clientReader() {
       
   174             try {
       
   175                 InputStream is = clientSock.getInputStream();
       
   176                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
       
   177                 System.out.println("clientReader: bufsize = " + bufsize);
       
   178                 while (true) {
       
   179                     byte[] buf = new byte[bufsize];
       
   180                     int n = is.read(buf);
       
   181                     if (n == -1) {
       
   182                         System.out.println("clientReader close: read "
       
   183                                 + readCount.get() + " bytes");
       
   184                         publisher.close();
       
   185                         sleep(2000);
       
   186                         Utils.close(is, clientSock);
       
   187                         return;
       
   188                     }
       
   189                     ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
       
   190                     readCount.addAndGet(n);
       
   191                     publisher.submit(List.of(bb));
       
   192                 }
       
   193             } catch (Throwable e) {
       
   194                 e.printStackTrace();
       
   195                 Utils.close(clientSock);
       
   196             }
       
   197         }
       
   198 
       
   199         // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
       
   200         // which is connected to the SSLSocket emulating a server.
       
   201         private void clientWriter() {
       
   202             long nbytes = 0;
       
   203             try {
       
   204                 OutputStream os =
       
   205                         new BufferedOutputStream(clientSock.getOutputStream());
       
   206 
       
   207                 while (true) {
       
   208                     ByteBuffer buf = buffer.take();
       
   209                     if (buf == FlowTest.SENTINEL) {
       
   210                         // finished
       
   211                         //Utils.sleep(2000);
       
   212                         System.out.println("clientWriter close: " + nbytes + " written");
       
   213                         clientSock.shutdownOutput();
       
   214                         System.out.println("clientWriter close return");
       
   215                         return;
       
   216                     }
       
   217                     int len = buf.remaining();
       
   218                     int written = writeToStream(os, buf);
       
   219                     assert len == written;
       
   220                     nbytes += len;
       
   221                     assert !buf.hasRemaining()
       
   222                             : "buffer has " + buf.remaining() + " bytes left";
       
   223                     clientSubscription.request(1);
       
   224                 }
       
   225             } catch (Throwable e) {
       
   226                 e.printStackTrace();
       
   227             }
       
   228         }
       
   229 
       
   230         private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
       
   231             byte[] b = buf.array();
       
   232             int offset = buf.arrayOffset() + buf.position();
       
   233             int n = buf.limit() - buf.position();
       
   234             os.write(b, offset, n);
       
   235             buf.position(buf.limit());
       
   236             os.flush();
       
   237             return n;
       
   238         }
       
   239 
       
   240         private final AtomicInteger loopCount = new AtomicInteger();
       
   241 
       
   242         public String monitor() {
       
   243             return "serverLoopback: loopcount = " + loopCount.toString()
       
   244                     + " clientRead: count = " + readCount.toString();
       
   245         }
       
   246 
       
   247         // thread2
       
   248         private void serverLoopback() {
       
   249             try {
       
   250                 InputStream is = serverSock.getInputStream();
       
   251                 OutputStream os = serverSock.getOutputStream();
       
   252                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
       
   253                 System.out.println("serverLoopback: bufsize = " + bufsize);
       
   254                 byte[] bb = new byte[bufsize];
       
   255                 while (true) {
       
   256                     int n = is.read(bb);
       
   257                     if (n == -1) {
       
   258                         sleep(2000);
       
   259                         is.close();
       
   260                         os.close();
       
   261                         serverSock.close();
       
   262                         return;
       
   263                     }
       
   264                     os.write(bb, 0, n);
       
   265                     os.flush();
       
   266                     loopCount.addAndGet(n);
       
   267                 }
       
   268             } catch (Throwable e) {
       
   269                 e.printStackTrace();
       
   270             }
       
   271         }
       
   272 
       
   273 
       
   274         /**
       
   275          * This needs to be called before the chain is subscribed. It can't be
       
   276          * supplied in the constructor.
       
   277          */
       
   278         public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) {
       
   279             publisher.subscribe(returnSubscriber);
   145         }
   280         }
   146 
   281 
   147         @Override
   282         @Override
   148         public void onSubscribe(Flow.Subscription subscription) {
   283         public void onSubscribe(Flow.Subscription subscription) {
   149             unfulfilled = maxQueueSize;
   284             clientSubscription = subscription;
   150             (this.subscription = subscription).request(maxQueueSize);
   285             clientSubscription.request(5);
   151         }
   286         }
   152 
   287 
   153         @Override
   288         @Override
   154         public void onNext(List<ByteBuffer> item) {
   289         public void onNext(List<ByteBuffer> item) {
   155             if (--unfulfilled == (maxQueueSize / 2)) {
   290             try {
   156                 subscription.request(maxQueueSize - unfulfilled);
   291                 for (ByteBuffer b : item)
   157                 unfulfilled = maxQueueSize;
   292                     buffer.put(b);
   158             }
   293             } catch (InterruptedException e) {
   159             queue.add(item);
   294                 e.printStackTrace();
   160             processingScheduler.deferOrSchedule(executor);
   295                 Utils.close(clientSock);
       
   296             }
   161         }
   297         }
   162 
   298 
   163         @Override
   299         @Override
   164         public void onError(Throwable throwable) {
   300         public void onError(Throwable throwable) {
   165             queue.add(throwable);
   301             throwable.printStackTrace();
   166             processingScheduler.deferOrSchedule(executor);
   302             Utils.close(clientSock);
   167         }
   303         }
   168 
   304 
   169         @Override
   305         @Override
   170         public void onComplete() {
   306         public void onComplete() {
   171             queue.add(EOF);
   307             try {
   172             processingScheduler.deferOrSchedule(executor);
   308                 buffer.put(FlowTest.SENTINEL);
       
   309             } catch (InterruptedException e) {
       
   310                 e.printStackTrace();
       
   311                 Utils.close(clientSock);
       
   312             }
   173         }
   313         }
   174 
   314 
   175         @Override
   315         @Override
   176         public boolean isFinished() {
   316         public boolean isFinished() {
   177             return false;
   317             return false;
   178         }
   318         }
   179 
   319 
   180         private class InternalSubscription implements Flow.Subscription {
   320         @Override
   181 
   321         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
   182             @Override
   322             publisher.subscribe(subscriber);
   183             public void request(long n) {
       
   184                 if (n <= 0) {
       
   185                     throw new InternalError();
       
   186                 }
       
   187                 demand.increase(n);
       
   188                 processingScheduler.runOrSchedule();
       
   189             }
       
   190 
       
   191             @Override
       
   192             public void cancel() {
       
   193                 cancelled.set(true);
       
   194             }
       
   195         }
       
   196 
       
   197         private SequentialScheduler.RestartableTask createProcessingTask() {
       
   198             return new SequentialScheduler.CompleteRestartableTask() {
       
   199 
       
   200                 @Override
       
   201                 protected void run() {
       
   202                     while (!cancelled.get()) {
       
   203                         Object item = queue.peek();
       
   204                         if (item == null)
       
   205                             return;
       
   206                         try {
       
   207                             if (item instanceof List) {
       
   208                                 if (!demand.tryDecrement())
       
   209                                     return;
       
   210                                 @SuppressWarnings("unchecked")
       
   211                                 List<ByteBuffer> bytes = (List<ByteBuffer>) item;
       
   212                                 subscriber.onNext(bytes);
       
   213                             } else if (item instanceof Throwable) {
       
   214                                 cancelled.set(true);
       
   215                                 subscriber.onError((Throwable) item);
       
   216                             } else if (item == EOF) {
       
   217                                 cancelled.set(true);
       
   218                                 subscriber.onComplete();
       
   219                             } else {
       
   220                                 throw new InternalError(String.valueOf(item));
       
   221                             }
       
   222                         } finally {
       
   223                             Object removed = queue.remove();
       
   224                             assert removed == item;
       
   225                         }
       
   226                     }
       
   227                 }
       
   228             };
       
   229         }
   323         }
   230     }
   324     }
       
   325 
       
   326     private static void sleep(long millis) {
       
   327         try {
       
   328             Thread.sleep(millis);
       
   329         } catch (InterruptedException e) {
       
   330 
       
   331         }
       
   332     }
       
   333 //    private static final class EchoTube implements FlowTube {
       
   334 //
       
   335 //        private final static Object EOF = new Object();
       
   336 //        private final Executor executor = Executors.newSingleThreadExecutor();
       
   337 //
       
   338 //        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
       
   339 //        private final int maxQueueSize;
       
   340 //        private final SequentialScheduler processingScheduler =
       
   341 //                new SequentialScheduler(createProcessingTask());
       
   342 //
       
   343 //        /* Writing into this tube */
       
   344 //        private long unfulfilled;
       
   345 //        private Flow.Subscription subscription;
       
   346 //
       
   347 //        /* Reading from this tube */
       
   348 //        private final Demand demand = new Demand();
       
   349 //        private final AtomicBoolean cancelled = new AtomicBoolean();
       
   350 //        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   351 //
       
   352 //        private EchoTube(int maxBufferSize) {
       
   353 //            if (maxBufferSize < 1)
       
   354 //                throw new IllegalArgumentException();
       
   355 //            this.maxQueueSize = maxBufferSize;
       
   356 //        }
       
   357 //
       
   358 //        @Override
       
   359 //        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   360 //            this.subscriber = subscriber;
       
   361 //            System.out.println("EchoTube got subscriber: " + subscriber);
       
   362 //            this.subscriber.onSubscribe(new InternalSubscription());
       
   363 //        }
       
   364 //
       
   365 //        @Override
       
   366 //        public void onSubscribe(Flow.Subscription subscription) {
       
   367 //            unfulfilled = maxQueueSize;
       
   368 //            System.out.println("EchoTube request: " + maxQueueSize);
       
   369 //            (this.subscription = subscription).request(maxQueueSize);
       
   370 //        }
       
   371 //
       
   372 //        @Override
       
   373 //        public void onNext(List<ByteBuffer> item) {
       
   374 //            if (--unfulfilled == (maxQueueSize / 2)) {
       
   375 //                long req = maxQueueSize - unfulfilled;
       
   376 //                subscription.request(req);
       
   377 //                System.out.println("EchoTube request: " + req);
       
   378 //                unfulfilled = maxQueueSize;
       
   379 //            }
       
   380 //            System.out.println("EchoTube add " + Utils.remaining(item));
       
   381 //            queue.add(item);
       
   382 //            processingScheduler.deferOrSchedule(executor);
       
   383 //        }
       
   384 //
       
   385 //        @Override
       
   386 //        public void onError(Throwable throwable) {
       
   387 //            System.out.println("EchoTube add " + throwable);
       
   388 //            queue.add(throwable);
       
   389 //            processingScheduler.deferOrSchedule(executor);
       
   390 //        }
       
   391 //
       
   392 //        @Override
       
   393 //        public void onComplete() {
       
   394 //            System.out.println("EchoTube add EOF");
       
   395 //            queue.add(EOF);
       
   396 //            processingScheduler.deferOrSchedule(executor);
       
   397 //        }
       
   398 //
       
   399 //        @Override
       
   400 //        public boolean isFinished() {
       
   401 //            return false;
       
   402 //        }
       
   403 //
       
   404 //        private class InternalSubscription implements Flow.Subscription {
       
   405 //
       
   406 //            @Override
       
   407 //            public void request(long n) {
       
   408 //                System.out.println("EchoTube got request: " + n);
       
   409 //                if (n <= 0) {
       
   410 //                    throw new InternalError();
       
   411 //                }
       
   412 //                demand.increase(n);
       
   413 //                processingScheduler.runOrSchedule();
       
   414 //            }
       
   415 //
       
   416 //            @Override
       
   417 //            public void cancel() {
       
   418 //                cancelled.set(true);
       
   419 //            }
       
   420 //        }
       
   421 //
       
   422 //        @Override
       
   423 //        public String toString() {
       
   424 //            return "EchoTube";
       
   425 //        }
       
   426 //
       
   427 //        private SequentialScheduler.RestartableTask createProcessingTask() {
       
   428 //            return new SequentialScheduler.CompleteRestartableTask() {
       
   429 //
       
   430 //                @Override
       
   431 //                protected void run() {
       
   432 //                    try {
       
   433 //                        while (!cancelled.get()) {
       
   434 //                            Object item = queue.peek();
       
   435 //                            if (item == null)
       
   436 //                                return;
       
   437 //                            try {
       
   438 //                                System.out.println("EchoTube processing item");
       
   439 //                                if (item instanceof List) {
       
   440 //                                    if (!demand.tryDecrement()) {
       
   441 //                                        System.out.println("EchoTube no demand");
       
   442 //                                        return;
       
   443 //                                    }
       
   444 //                                    @SuppressWarnings("unchecked")
       
   445 //                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
       
   446 //                                    Object removed = queue.remove();
       
   447 //                                    assert removed == item;
       
   448 //                                    System.out.println("EchoTube processing "
       
   449 //                                            + Utils.remaining(bytes));
       
   450 //                                    subscriber.onNext(bytes);
       
   451 //                                } else if (item instanceof Throwable) {
       
   452 //                                    cancelled.set(true);
       
   453 //                                    Object removed = queue.remove();
       
   454 //                                    assert removed == item;
       
   455 //                                    System.out.println("EchoTube processing " + item);
       
   456 //                                    subscriber.onError((Throwable) item);
       
   457 //                                } else if (item == EOF) {
       
   458 //                                    cancelled.set(true);
       
   459 //                                    Object removed = queue.remove();
       
   460 //                                    assert removed == item;
       
   461 //                                    System.out.println("EchoTube processing EOF");
       
   462 //                                    subscriber.onComplete();
       
   463 //                                } else {
       
   464 //                                    throw new InternalError(String.valueOf(item));
       
   465 //                                }
       
   466 //                            } finally {
       
   467 //                            }
       
   468 //                        }
       
   469 //                    } catch(Throwable t) {
       
   470 //                        t.printStackTrace();
       
   471 //                        throw t;
       
   472 //                    }
       
   473 //                }
       
   474 //            };
       
   475 //        }
       
   476 //    }
   231 
   477 
   232     /**
   478     /**
   233      * The final subscriber which receives the decrypted looped-back data. Just
   479      * The final subscriber which receives the decrypted looped-back data. Just
   234      * needs to compare the data with what was sent. The given CF is either
   480      * needs to compare the data with what was sent. The given CF is either
   235      * completed exceptionally with an error or normally on success.
   481      * completed exceptionally with an error or normally on success.
   251 
   497 
   252         @Override
   498         @Override
   253         public void onSubscribe(Flow.Subscription subscription) {
   499         public void onSubscribe(Flow.Subscription subscription) {
   254             this.subscription = subscription;
   500             this.subscription = subscription;
   255             unfulfilled = REQUEST_WINDOW;
   501             unfulfilled = REQUEST_WINDOW;
       
   502             System.out.println("EndSubscriber request " + REQUEST_WINDOW);
   256             subscription.request(REQUEST_WINDOW);
   503             subscription.request(REQUEST_WINDOW);
   257         }
   504         }
   258 
   505 
   259         public static String info(List<ByteBuffer> i) {
   506         public static String info(List<ByteBuffer> i) {
   260             StringBuilder sb = new StringBuilder();
   507             StringBuilder sb = new StringBuilder();
   267         }
   514         }
   268 
   515 
   269         @Override
   516         @Override
   270         public void onNext(List<ByteBuffer> buffers) {
   517         public void onNext(List<ByteBuffer> buffers) {
   271             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
   518             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
   272                 subscription.request(REQUEST_WINDOW - unfulfilled);
   519                 long req = REQUEST_WINDOW - unfulfilled;
       
   520                 System.out.println("EndSubscriber request " + req);
       
   521                 subscription.request(req);
   273                 unfulfilled = REQUEST_WINDOW;
   522                 unfulfilled = REQUEST_WINDOW;
   274             }
   523             }
   275 
   524 
   276             long currval = counter.get();
   525             long currval = counter.get();
   277             if (currval % 500 == 0) {
   526             if (currval % 500 == 0) {
   278                 System.out.println("End: " + currval);
   527                 System.out.println("End: " + currval);
   279             }
   528             }
       
   529             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
   280 
   530 
   281             for (ByteBuffer buf : buffers) {
   531             for (ByteBuffer buf : buffers) {
   282                 while (buf.hasRemaining()) {
   532                 while (buf.hasRemaining()) {
   283                     long n = buf.getLong();
   533                     long n = buf.getLong();
   284                     if (currval > (SSLTubeTest.TOTAL_LONGS - 50)) {
   534                     if (currval > (SSLTubeTest.TOTAL_LONGS - 50)) {
   296             counter.set(currval);
   546             counter.set(currval);
   297         }
   547         }
   298 
   548 
   299         @Override
   549         @Override
   300         public void onError(Throwable throwable) {
   550         public void onError(Throwable throwable) {
       
   551             System.out.println("EndSubscriber onError " + throwable);
   301             completion.completeExceptionally(throwable);
   552             completion.completeExceptionally(throwable);
   302         }
   553         }
   303 
   554 
   304         @Override
   555         @Override
   305         public void onComplete() {
   556         public void onComplete() {
   309                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   560                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   310             } else {
   561             } else {
   311                 System.out.println("DONE OK");
   562                 System.out.println("DONE OK");
   312                 completion.complete(null);
   563                 completion.complete(null);
   313             }
   564             }
       
   565         }
       
   566         @Override
       
   567         public String toString() {
       
   568             return "EndSubscriber";
   314         }
   569         }
   315     }
   570     }
   316 
   571 
   317     private static SSLEngine createSSLEngine(boolean client) throws IOException {
   572     private static SSLEngine createSSLEngine(boolean client) throws IOException {
   318         SSLContext context = (new SimpleSSLContext()).get();
   573         SSLContext context = (new SimpleSSLContext()).get();