test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java
branchhttp-client-branch
changeset 55970 261d4d2f77e2
parent 55968 11a97b370db0
child 55973 4d9b002587db
equal deleted inserted replaced
55969:1da220f80a5d 55970:261d4d2f77e2
    40 import java.util.List;
    40 import java.util.List;
    41 import java.util.Random;
    41 import java.util.Random;
    42 import java.util.StringTokenizer;
    42 import java.util.StringTokenizer;
    43 import java.util.concurrent.BlockingQueue;
    43 import java.util.concurrent.BlockingQueue;
    44 import java.util.concurrent.CompletableFuture;
    44 import java.util.concurrent.CompletableFuture;
       
    45 import java.util.concurrent.CountDownLatch;
    45 import java.util.concurrent.ExecutorService;
    46 import java.util.concurrent.ExecutorService;
    46 import java.util.concurrent.Executors;
    47 import java.util.concurrent.Executors;
    47 import java.util.concurrent.Flow;
    48 import java.util.concurrent.Flow;
    48 import java.util.concurrent.Flow.Subscriber;
    49 import java.util.concurrent.Flow.Subscriber;
    49 import java.util.concurrent.LinkedBlockingQueue;
    50 import java.util.concurrent.LinkedBlockingQueue;
    66     private static final int LONGS_PER_BUF = 800;
    67     private static final int LONGS_PER_BUF = 800;
    67     static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
    68     static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
    68     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
    69     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
    69     static volatile String alpn;
    70     static volatile String alpn;
    70 
    71 
       
    72     // This is a hack to work around an issue with SubmissionPublisher.
       
    73     // SubmissionPublisher will call onComplete immediately without forwarding
       
    74     // remaining pending data if SubmissionPublisher.close() is called when
       
    75     // there is no demand. In other words, it doesn't wait for the subscriber
       
    76     // to pull all the data before calling onComplete.
       
    77     // We use a CountDownLatch to figure out when it is safe to call close().
       
    78     // This may cause the test to hang if data are buffered.
       
    79     final CountDownLatch allBytesReceived = new CountDownLatch(1);
       
    80 
    71     private final CompletableFuture<Void> completion;
    81     private final CompletableFuture<Void> completion;
    72 
    82 
    73     public FlowTest() throws IOException {
    83     public FlowTest() throws IOException {
    74         executor = Executors.newCachedThreadPool();
    84         executor = Executors.newCachedThreadPool();
    75         srcPublisher = new SubmissionPublisher<>(executor, 20,
    85         srcPublisher = new SubmissionPublisher<>(executor, 20,
    80         params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2
    90         params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2
    81         params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl
    91         params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl
    82         engineClient.setSSLParameters(params);
    92         engineClient.setSSLParameters(params);
    83         engineClient.setUseClientMode(true);
    93         engineClient.setUseClientMode(true);
    84         completion = new CompletableFuture<>();
    94         completion = new CompletableFuture<>();
    85         SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor);
    95         SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor, allBytesReceived);
    86         looper.start();
    96         looper.start();
    87         EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
    97         EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
    88         SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
    98         SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
    89         // going to measure how long handshake takes
    99         // going to measure how long handshake takes
    90         final long start = System.currentTimeMillis();
   100         final long start = System.currentTimeMillis();
    91         sslClient.alpn().whenComplete((String s, Throwable t) -> {
   101         sslClient.alpn().whenComplete((String s, Throwable t) -> {
    92             if (t != null)
   102             if (t != null)
   129             ByteBuffer b = getBuffer(count);
   139             ByteBuffer b = getBuffer(count);
   130             count += LONGS_PER_BUF;
   140             count += LONGS_PER_BUF;
   131             srcPublisher.submit(List.of(b));
   141             srcPublisher.submit(List.of(b));
   132         }
   142         }
   133         System.out.println("Finished submission. Waiting for loopback");
   143         System.out.println("Finished submission. Waiting for loopback");
       
   144         // make sure we don't wait for allBytesReceived in case of error.
       
   145         completion.whenComplete((r,t) -> allBytesReceived.countDown());
       
   146         try {
       
   147             allBytesReceived.await();
       
   148         } catch (InterruptedException e) {
       
   149             throw new RuntimeException(e);
       
   150         }
       
   151         System.out.println("All bytes received: ");
   134         srcPublisher.close();
   152         srcPublisher.close();
   135         try {
   153         try {
   136             completion.join();
   154             completion.join();
   137             if (!alpn.equals("proto2")) {
   155             if (!alpn.equals("proto2")) {
   138                 throw new RuntimeException("wrong alpn received");
   156                 throw new RuntimeException("wrong alpn received");
   170         private final Socket clientSock;
   188         private final Socket clientSock;
   171         private final SSLSocket serverSock;
   189         private final SSLSocket serverSock;
   172         private final Thread thread1, thread2, thread3;
   190         private final Thread thread1, thread2, thread3;
   173         private volatile Flow.Subscription clientSubscription;
   191         private volatile Flow.Subscription clientSubscription;
   174         private final SubmissionPublisher<List<ByteBuffer>> publisher;
   192         private final SubmissionPublisher<List<ByteBuffer>> publisher;
   175 
   193         private final CountDownLatch allBytesReceived;
   176         SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
   194 
       
   195         SSLLoopbackSubscriber(SSLContext ctx,
       
   196                               ExecutorService exec,
       
   197                               CountDownLatch allBytesReceived) throws IOException {
   177             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
   198             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
   178             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
   199             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
   179             SSLParameters params = serv.getSSLParameters();
   200             SSLParameters params = serv.getSSLParameters();
   180             params.setApplicationProtocols(new String[]{"proto2"});
   201             params.setApplicationProtocols(new String[]{"proto2"});
   181             serv.setSSLParameters(params);
   202             serv.setSSLParameters(params);
   183 
   204 
   184             int serverPort = serv.getLocalPort();
   205             int serverPort = serv.getLocalPort();
   185             clientSock = new Socket("127.0.0.1", serverPort);
   206             clientSock = new Socket("127.0.0.1", serverPort);
   186             serverSock = (SSLSocket) serv.accept();
   207             serverSock = (SSLSocket) serv.accept();
   187             this.buffer = new LinkedBlockingQueue<>();
   208             this.buffer = new LinkedBlockingQueue<>();
       
   209             this.allBytesReceived = allBytesReceived;
   188             thread1 = new Thread(this::clientWriter, "clientWriter");
   210             thread1 = new Thread(this::clientWriter, "clientWriter");
   189             thread2 = new Thread(this::serverLoopback, "serverLoopback");
   211             thread2 = new Thread(this::serverLoopback, "serverLoopback");
   190             thread3 = new Thread(this::clientReader, "clientReader");
   212             thread3 = new Thread(this::clientReader, "clientReader");
   191             publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
   213             publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
   192                     this::handlePublisherException);
   214                     this::handlePublisherException);
   216                     byte[] buf = new byte[bufsize];
   238                     byte[] buf = new byte[bufsize];
   217                     int n = is.read(buf);
   239                     int n = is.read(buf);
   218                     if (n == -1) {
   240                     if (n == -1) {
   219                         System.out.println("clientReader close: read "
   241                         System.out.println("clientReader close: read "
   220                                 + readCount.get() + " bytes");
   242                                 + readCount.get() + " bytes");
       
   243                         System.out.println("clientReader: got EOF. "
       
   244                                             + "Waiting signal to close publisher.");
       
   245                         allBytesReceived.await();
       
   246                         System.out.println("clientReader: closing publisher");
   221                         publisher.close();
   247                         publisher.close();
   222                         sleep(2000);
   248                         sleep(2000);
   223                         Utils.close(is, clientSock);
   249                         Utils.close(is, clientSock);
   224                         return;
   250                         return;
   225                     }
   251                     }
   359         private final long nbytes;
   385         private final long nbytes;
   360 
   386 
   361         private final AtomicLong counter;
   387         private final AtomicLong counter;
   362         private volatile Flow.Subscription subscription;
   388         private volatile Flow.Subscription subscription;
   363         private final CompletableFuture<Void> completion;
   389         private final CompletableFuture<Void> completion;
   364 
   390         private final CountDownLatch allBytesReceived;
   365         EndSubscriber(long nbytes, CompletableFuture<Void> completion) {
   391 
       
   392         EndSubscriber(long nbytes,
       
   393                       CompletableFuture<Void> completion,
       
   394                       CountDownLatch allBytesReceived) {
   366             counter = new AtomicLong(0);
   395             counter = new AtomicLong(0);
   367             this.nbytes = nbytes;
   396             this.nbytes = nbytes;
   368             this.completion = completion;
   397             this.completion = completion;
       
   398             this.allBytesReceived = allBytesReceived;
   369         }
   399         }
   370 
   400 
   371         @Override
   401         @Override
   372         public void onSubscribe(Flow.Subscription subscription) {
   402         public void onSubscribe(Flow.Subscription subscription) {
   373             this.subscription = subscription;
   403             this.subscription = subscription;
   406                 }
   436                 }
   407             }
   437             }
   408 
   438 
   409             counter.set(currval);
   439             counter.set(currval);
   410             subscription.request(1);
   440             subscription.request(1);
       
   441             if (currval >= TOTAL_LONGS) {
       
   442                 allBytesReceived.countDown();
       
   443             }
   411         }
   444         }
   412 
   445 
   413         @Override
   446         @Override
   414         public void onError(Throwable throwable) {
   447         public void onError(Throwable throwable) {
       
   448             allBytesReceived.countDown();
   415             completion.completeExceptionally(throwable);
   449             completion.completeExceptionally(throwable);
   416         }
   450         }
   417 
   451 
   418         @Override
   452         @Override
   419         public void onComplete() {
   453         public void onComplete() {
   421             if (n != nbytes) {
   455             if (n != nbytes) {
   422                 System.out.printf("nbytes=%d n=%d\n", nbytes, n);
   456                 System.out.printf("nbytes=%d n=%d\n", nbytes, n);
   423                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   457                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   424             } else {
   458             } else {
   425                 System.out.println("DONE OK: counter = " + n);
   459                 System.out.println("DONE OK: counter = " + n);
       
   460                 allBytesReceived.countDown();
   426                 completion.complete(null);
   461                 completion.complete(null);
   427             }
   462             }
   428         }
   463         }
   429     }
   464     }
   430 
   465