21 * questions. |
21 * questions. |
22 */ |
22 */ |
23 |
23 |
24 package jdk.incubator.http; |
24 package jdk.incubator.http; |
25 |
25 |
26 import jdk.incubator.http.internal.common.Demand; |
|
27 import jdk.incubator.http.internal.common.FlowTube; |
26 import jdk.incubator.http.internal.common.FlowTube; |
28 import jdk.incubator.http.internal.common.SSLFlowDelegate; |
|
29 import jdk.incubator.http.internal.common.SSLTube; |
27 import jdk.incubator.http.internal.common.SSLTube; |
30 import jdk.incubator.http.internal.common.SequentialScheduler; |
|
31 import jdk.incubator.http.internal.common.Utils; |
28 import jdk.incubator.http.internal.common.Utils; |
32 import org.testng.annotations.Test; |
29 import org.testng.annotations.Test; |
33 |
30 |
34 import javax.net.ssl.KeyManagerFactory; |
31 import javax.net.ssl.KeyManagerFactory; |
35 import javax.net.ssl.SSLContext; |
32 import javax.net.ssl.SSLContext; |
36 import javax.net.ssl.SSLEngine; |
33 import javax.net.ssl.SSLEngine; |
37 import javax.net.ssl.SSLParameters; |
34 import javax.net.ssl.SSLParameters; |
38 import javax.net.ssl.SSLServerSocket; |
|
39 import javax.net.ssl.SSLServerSocketFactory; |
|
40 import javax.net.ssl.SSLSocket; |
|
41 import javax.net.ssl.TrustManagerFactory; |
35 import javax.net.ssl.TrustManagerFactory; |
42 import java.io.BufferedOutputStream; |
|
43 import java.io.File; |
36 import java.io.File; |
44 import java.io.FileInputStream; |
37 import java.io.FileInputStream; |
45 import java.io.IOException; |
38 import java.io.IOException; |
46 import java.io.InputStream; |
39 import java.io.InputStream; |
47 import java.io.OutputStream; |
|
48 import java.net.Socket; |
|
49 import java.nio.ByteBuffer; |
40 import java.nio.ByteBuffer; |
50 import java.security.KeyManagementException; |
41 import java.security.KeyManagementException; |
51 import java.security.KeyStore; |
42 import java.security.KeyStore; |
52 import java.security.KeyStoreException; |
43 import java.security.KeyStoreException; |
53 import java.security.NoSuchAlgorithmException; |
44 import java.security.NoSuchAlgorithmException; |
54 import java.security.UnrecoverableKeyException; |
45 import java.security.UnrecoverableKeyException; |
55 import java.security.cert.CertificateException; |
46 import java.security.cert.CertificateException; |
56 import java.util.List; |
47 import java.util.List; |
57 import java.util.Queue; |
|
58 import java.util.Random; |
48 import java.util.Random; |
59 import java.util.StringTokenizer; |
49 import java.util.StringTokenizer; |
60 import java.util.concurrent.BlockingQueue; |
|
61 import java.util.concurrent.CompletableFuture; |
50 import java.util.concurrent.CompletableFuture; |
62 import java.util.concurrent.ConcurrentLinkedQueue; |
51 import java.util.concurrent.CountDownLatch; |
63 import java.util.concurrent.Executor; |
|
64 import java.util.concurrent.ExecutorService; |
52 import java.util.concurrent.ExecutorService; |
65 import java.util.concurrent.Executors; |
|
66 import java.util.concurrent.Flow; |
53 import java.util.concurrent.Flow; |
67 import java.util.concurrent.ForkJoinPool; |
54 import java.util.concurrent.ForkJoinPool; |
68 import java.util.concurrent.LinkedBlockingQueue; |
|
69 import java.util.concurrent.SubmissionPublisher; |
55 import java.util.concurrent.SubmissionPublisher; |
70 import java.util.concurrent.atomic.AtomicBoolean; |
|
71 import java.util.concurrent.atomic.AtomicInteger; |
|
72 import java.util.concurrent.atomic.AtomicLong; |
56 import java.util.concurrent.atomic.AtomicLong; |
73 import java.util.concurrent.atomic.AtomicReference; |
|
74 import java.util.function.Consumer; |
|
75 |
57 |
76 public class AbstractSSLTubeTest extends AbstractRandomTest { |
58 public class AbstractSSLTubeTest extends AbstractRandomTest { |
77 |
59 |
78 public static final long COUNTER = 600; |
60 public static final long COUNTER = 600; |
79 public static final int LONGS_PER_BUF = 800; |
61 public static final int LONGS_PER_BUF = 800; |
80 public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
62 public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
81 public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
63 public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
|
64 // This is a hack to work around an issue with SubmissionPublisher. |
|
65 // SubmissionPublisher will call onComplete immediately without forwarding |
|
66 // remaining pending data if SubmissionPublisher.close() is called when |
|
67 // there is no demand. In other words, it doesn't wait for the subscriber |
|
68 // to pull all the data before calling onComplete. |
|
69 // We use a CountDownLatch to figure out when it is safe to call close(). |
|
70 // This may cause the test to hang if data are buffered. |
|
71 protected final CountDownLatch allBytesReceived = new CountDownLatch(1); |
|
72 |
82 |
73 |
83 protected static ByteBuffer getBuffer(long startingAt) { |
74 protected static ByteBuffer getBuffer(long startingAt) { |
84 ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); |
75 ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); |
85 for (int j = 0; j < LONGS_PER_BUF; j++) { |
76 for (int j = 0; j < LONGS_PER_BUF; j++) { |
86 buf.putLong(startingAt++); |
77 buf.putLong(startingAt++); |
87 } |
78 } |
88 buf.flip(); |
79 buf.flip(); |
89 return buf; |
80 return buf; |
90 } |
81 } |
91 |
82 |
92 protected void run(FlowTube server, ExecutorService sslExecutor) throws IOException { |
83 protected void run(FlowTube server, |
|
84 ExecutorService sslExecutor, |
|
85 CountDownLatch allBytesReceived) throws IOException { |
93 FlowTube client = new SSLTube(createSSLEngine(true), |
86 FlowTube client = new SSLTube(createSSLEngine(true), |
94 sslExecutor, |
87 sslExecutor, |
95 server); |
88 server); |
96 SubmissionPublisher<List<ByteBuffer>> p = |
89 SubmissionPublisher<List<ByteBuffer>> p = |
97 new SubmissionPublisher<>(ForkJoinPool.commonPool(), |
90 new SubmissionPublisher<>(ForkJoinPool.commonPool(), |
98 Integer.MAX_VALUE); |
91 Integer.MAX_VALUE); |
99 FlowTube.TubePublisher begin = p::subscribe; |
92 FlowTube.TubePublisher begin = p::subscribe; |
100 CompletableFuture<Void> completion = new CompletableFuture<>(); |
93 CompletableFuture<Void> completion = new CompletableFuture<>(); |
101 EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion); |
94 EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived); |
102 client.connectFlows(begin, end); |
95 client.connectFlows(begin, end); |
103 /* End of wiring */ |
96 /* End of wiring */ |
104 |
97 |
105 long count = 0; |
98 long count = 0; |
106 System.out.printf("Submitting %d buffer arrays\n", COUNTER); |
99 System.out.printf("Submitting %d buffer arrays\n", COUNTER); |