test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java
branchhttp-client-branch
changeset 55970 261d4d2f77e2
parent 55968 11a97b370db0
child 55972 3fe2ae6d97a4
equal deleted inserted replaced
55969:1da220f80a5d 55970:261d4d2f77e2
    21  * questions.
    21  * questions.
    22  */
    22  */
    23 
    23 
    24 package jdk.incubator.http;
    24 package jdk.incubator.http;
    25 
    25 
    26 import jdk.incubator.http.internal.common.Demand;
       
    27 import jdk.incubator.http.internal.common.FlowTube;
    26 import jdk.incubator.http.internal.common.FlowTube;
    28 import jdk.incubator.http.internal.common.SSLFlowDelegate;
       
    29 import jdk.incubator.http.internal.common.SSLTube;
    27 import jdk.incubator.http.internal.common.SSLTube;
    30 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    31 import jdk.incubator.http.internal.common.Utils;
    28 import jdk.incubator.http.internal.common.Utils;
    32 import org.testng.annotations.Test;
    29 import org.testng.annotations.Test;
    33 
    30 
    34 import javax.net.ssl.KeyManagerFactory;
    31 import javax.net.ssl.KeyManagerFactory;
    35 import javax.net.ssl.SSLContext;
    32 import javax.net.ssl.SSLContext;
    36 import javax.net.ssl.SSLEngine;
    33 import javax.net.ssl.SSLEngine;
    37 import javax.net.ssl.SSLParameters;
    34 import javax.net.ssl.SSLParameters;
    38 import javax.net.ssl.SSLServerSocket;
       
    39 import javax.net.ssl.SSLServerSocketFactory;
       
    40 import javax.net.ssl.SSLSocket;
       
    41 import javax.net.ssl.TrustManagerFactory;
    35 import javax.net.ssl.TrustManagerFactory;
    42 import java.io.BufferedOutputStream;
       
    43 import java.io.File;
    36 import java.io.File;
    44 import java.io.FileInputStream;
    37 import java.io.FileInputStream;
    45 import java.io.IOException;
    38 import java.io.IOException;
    46 import java.io.InputStream;
    39 import java.io.InputStream;
    47 import java.io.OutputStream;
       
    48 import java.net.Socket;
       
    49 import java.nio.ByteBuffer;
    40 import java.nio.ByteBuffer;
    50 import java.security.KeyManagementException;
    41 import java.security.KeyManagementException;
    51 import java.security.KeyStore;
    42 import java.security.KeyStore;
    52 import java.security.KeyStoreException;
    43 import java.security.KeyStoreException;
    53 import java.security.NoSuchAlgorithmException;
    44 import java.security.NoSuchAlgorithmException;
    54 import java.security.UnrecoverableKeyException;
    45 import java.security.UnrecoverableKeyException;
    55 import java.security.cert.CertificateException;
    46 import java.security.cert.CertificateException;
    56 import java.util.List;
    47 import java.util.List;
    57 import java.util.Queue;
       
    58 import java.util.Random;
    48 import java.util.Random;
    59 import java.util.StringTokenizer;
    49 import java.util.StringTokenizer;
    60 import java.util.concurrent.BlockingQueue;
       
    61 import java.util.concurrent.CompletableFuture;
    50 import java.util.concurrent.CompletableFuture;
    62 import java.util.concurrent.ConcurrentLinkedQueue;
    51 import java.util.concurrent.CountDownLatch;
    63 import java.util.concurrent.Executor;
       
    64 import java.util.concurrent.ExecutorService;
    52 import java.util.concurrent.ExecutorService;
    65 import java.util.concurrent.Executors;
       
    66 import java.util.concurrent.Flow;
    53 import java.util.concurrent.Flow;
    67 import java.util.concurrent.ForkJoinPool;
    54 import java.util.concurrent.ForkJoinPool;
    68 import java.util.concurrent.LinkedBlockingQueue;
       
    69 import java.util.concurrent.SubmissionPublisher;
    55 import java.util.concurrent.SubmissionPublisher;
    70 import java.util.concurrent.atomic.AtomicBoolean;
       
    71 import java.util.concurrent.atomic.AtomicInteger;
       
    72 import java.util.concurrent.atomic.AtomicLong;
    56 import java.util.concurrent.atomic.AtomicLong;
    73 import java.util.concurrent.atomic.AtomicReference;
       
    74 import java.util.function.Consumer;
       
    75 
    57 
    76 public class AbstractSSLTubeTest extends AbstractRandomTest {
    58 public class AbstractSSLTubeTest extends AbstractRandomTest {
    77 
    59 
    78     public static final long COUNTER = 600;
    60     public static final long COUNTER = 600;
    79     public static final int LONGS_PER_BUF = 800;
    61     public static final int LONGS_PER_BUF = 800;
    80     public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
    62     public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
    81     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
    63     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
       
    64     // This is a hack to work around an issue with SubmissionPublisher.
       
    65     // SubmissionPublisher will call onComplete immediately without forwarding
       
    66     // remaining pending data if SubmissionPublisher.close() is called when
       
    67     // there is no demand. In other words, it doesn't wait for the subscriber
       
    68     // to pull all the data before calling onComplete.
       
    69     // We use a CountDownLatch to figure out when it is safe to call close().
       
    70     // This may cause the test to hang if data are buffered.
       
    71     protected final CountDownLatch allBytesReceived = new CountDownLatch(1);
       
    72 
    82 
    73 
    83     protected static ByteBuffer getBuffer(long startingAt) {
    74     protected static ByteBuffer getBuffer(long startingAt) {
    84         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
    75         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
    85         for (int j = 0; j < LONGS_PER_BUF; j++) {
    76         for (int j = 0; j < LONGS_PER_BUF; j++) {
    86             buf.putLong(startingAt++);
    77             buf.putLong(startingAt++);
    87         }
    78         }
    88         buf.flip();
    79         buf.flip();
    89         return buf;
    80         return buf;
    90     }
    81     }
    91 
    82 
    92     protected void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
    83     protected void run(FlowTube server,
       
    84                        ExecutorService sslExecutor,
       
    85                        CountDownLatch allBytesReceived) throws IOException {
    93         FlowTube client = new SSLTube(createSSLEngine(true),
    86         FlowTube client = new SSLTube(createSSLEngine(true),
    94                                       sslExecutor,
    87                                       sslExecutor,
    95                                       server);
    88                                       server);
    96         SubmissionPublisher<List<ByteBuffer>> p =
    89         SubmissionPublisher<List<ByteBuffer>> p =
    97                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
    90                 new SubmissionPublisher<>(ForkJoinPool.commonPool(),
    98                                           Integer.MAX_VALUE);
    91                                           Integer.MAX_VALUE);
    99         FlowTube.TubePublisher begin = p::subscribe;
    92         FlowTube.TubePublisher begin = p::subscribe;
   100         CompletableFuture<Void> completion = new CompletableFuture<>();
    93         CompletableFuture<Void> completion = new CompletableFuture<>();
   101         EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
    94         EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
   102         client.connectFlows(begin, end);
    95         client.connectFlows(begin, end);
   103         /* End of wiring */
    96         /* End of wiring */
   104 
    97 
   105         long count = 0;
    98         long count = 0;
   106         System.out.printf("Submitting %d buffer arrays\n", COUNTER);
    99         System.out.printf("Submitting %d buffer arrays\n", COUNTER);
   109             ByteBuffer b = getBuffer(count);
   102             ByteBuffer b = getBuffer(count);
   110             count += LONGS_PER_BUF;
   103             count += LONGS_PER_BUF;
   111             p.submit(List.of(b));
   104             p.submit(List.of(b));
   112         }
   105         }
   113         System.out.println("Finished submission. Waiting for loopback");
   106         System.out.println("Finished submission. Waiting for loopback");
       
   107         completion.whenComplete((r,t) -> allBytesReceived.countDown());
       
   108         try {
       
   109             allBytesReceived.await();
       
   110         } catch (InterruptedException e) {
       
   111             throw new IOException(e);
       
   112         }
   114         p.close();
   113         p.close();
       
   114         System.out.println("All bytes received: calling publisher.close()");
   115         try {
   115         try {
   116             completion.join();
   116             completion.join();
   117             System.out.println("OK");
   117             System.out.println("OK");
   118         } finally {
   118         } finally {
   119             sslExecutor.shutdownNow();
   119             sslExecutor.shutdownNow();
   138         private static final int REQUEST_WINDOW = 13;
   138         private static final int REQUEST_WINDOW = 13;
   139 
   139 
   140         private final long nbytes;
   140         private final long nbytes;
   141         private final AtomicLong counter = new AtomicLong();
   141         private final AtomicLong counter = new AtomicLong();
   142         private final CompletableFuture<?> completion;
   142         private final CompletableFuture<?> completion;
       
   143         private final CountDownLatch allBytesReceived;
   143         private volatile Flow.Subscription subscription;
   144         private volatile Flow.Subscription subscription;
   144         private long unfulfilled;
   145         private long unfulfilled;
   145 
   146 
   146         EndSubscriber(long nbytes, CompletableFuture<?> completion) {
   147         EndSubscriber(long nbytes, CompletableFuture<?> completion,
       
   148                       CountDownLatch allBytesReceived) {
   147             this.nbytes = nbytes;
   149             this.nbytes = nbytes;
   148             this.completion = completion;
   150             this.completion = completion;
       
   151             this.allBytesReceived = allBytesReceived;
   149         }
   152         }
   150 
   153 
   151         @Override
   154         @Override
   152         public void onSubscribe(Flow.Subscription subscription) {
   155         public void onSubscribe(Flow.Subscription subscription) {
   153             this.subscription = subscription;
   156             this.subscription = subscription;
   182             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
   185             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
   183 
   186 
   184             for (ByteBuffer buf : buffers) {
   187             for (ByteBuffer buf : buffers) {
   185                 while (buf.hasRemaining()) {
   188                 while (buf.hasRemaining()) {
   186                     long n = buf.getLong();
   189                     long n = buf.getLong();
   187                     if (currval > (AbstractSSLTubeTest.TOTAL_LONGS - 50)) {
   190                     if (currval > (TOTAL_LONGS - 50)) {
   188                         System.out.println("End: " + currval);
   191                         System.out.println("End: " + currval);
   189                     }
   192                     }
   190                     if (n != currval++) {
   193                     if (n != currval++) {
   191                         System.out.println("ERROR at " + n + " != " + (currval - 1));
   194                         System.out.println("ERROR at " + n + " != " + (currval - 1));
   192                         completion.completeExceptionally(new RuntimeException("ERROR"));
   195                         completion.completeExceptionally(new RuntimeException("ERROR"));
   195                     }
   198                     }
   196                 }
   199                 }
   197             }
   200             }
   198 
   201 
   199             counter.set(currval);
   202             counter.set(currval);
       
   203             if (currval >= TOTAL_LONGS) {
       
   204                 allBytesReceived.countDown();
       
   205             }
   200         }
   206         }
   201 
   207 
   202         @Override
   208         @Override
   203         public void onError(Throwable throwable) {
   209         public void onError(Throwable throwable) {
   204             System.out.println("EndSubscriber onError " + throwable);
   210             System.out.println("EndSubscriber onError " + throwable);
   205             completion.completeExceptionally(throwable);
   211             completion.completeExceptionally(throwable);
       
   212             allBytesReceived.countDown();
   206         }
   213         }
   207 
   214 
   208         @Override
   215         @Override
   209         public void onComplete() {
   216         public void onComplete() {
   210             long n = counter.get();
   217             long n = counter.get();
   213                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   220                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
   214             } else {
   221             } else {
   215                 System.out.println("DONE OK");
   222                 System.out.println("DONE OK");
   216                 completion.complete(null);
   223                 completion.complete(null);
   217             }
   224             }
   218         }
   225             allBytesReceived.countDown();
       
   226         }
       
   227         
   219         @Override
   228         @Override
   220         public String toString() {
   229         public String toString() {
   221             return "EndSubscriber";
   230             return "EndSubscriber";
   222         }
   231         }
   223     }
   232     }