40 import java.util.List; |
40 import java.util.List; |
41 import java.util.Random; |
41 import java.util.Random; |
42 import java.util.StringTokenizer; |
42 import java.util.StringTokenizer; |
43 import java.util.concurrent.BlockingQueue; |
43 import java.util.concurrent.BlockingQueue; |
44 import java.util.concurrent.CompletableFuture; |
44 import java.util.concurrent.CompletableFuture; |
|
45 import java.util.concurrent.CountDownLatch; |
45 import java.util.concurrent.ExecutorService; |
46 import java.util.concurrent.ExecutorService; |
46 import java.util.concurrent.Executors; |
47 import java.util.concurrent.Executors; |
47 import java.util.concurrent.Flow; |
48 import java.util.concurrent.Flow; |
48 import java.util.concurrent.Flow.Subscriber; |
49 import java.util.concurrent.Flow.Subscriber; |
49 import java.util.concurrent.LinkedBlockingQueue; |
50 import java.util.concurrent.LinkedBlockingQueue; |
66 private static final int LONGS_PER_BUF = 800; |
67 private static final int LONGS_PER_BUF = 800; |
67 static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
68 static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
68 public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
69 public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
69 static volatile String alpn; |
70 static volatile String alpn; |
70 |
71 |
|
72 // This is a hack to work around an issue with SubmissionPublisher. |
|
73 // SubmissionPublisher will call onComplete immediately without forwarding |
|
74 // remaining pending data if SubmissionPublisher.close() is called when |
|
75 // there is no demand. In other words, it doesn't wait for the subscriber |
|
76 // to pull all the data before calling onComplete. |
|
77 // We use a CountDownLatch to figure out when it is safe to call close(). |
|
78 // This may cause the test to hang if data are buffered. |
|
79 final CountDownLatch allBytesReceived = new CountDownLatch(1); |
|
80 |
71 private final CompletableFuture<Void> completion; |
81 private final CompletableFuture<Void> completion; |
72 |
82 |
73 public FlowTest() throws IOException { |
83 public FlowTest() throws IOException { |
74 executor = Executors.newCachedThreadPool(); |
84 executor = Executors.newCachedThreadPool(); |
75 srcPublisher = new SubmissionPublisher<>(executor, 20, |
85 srcPublisher = new SubmissionPublisher<>(executor, 20, |
80 params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2 |
90 params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2 |
81 params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl |
91 params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl |
82 engineClient.setSSLParameters(params); |
92 engineClient.setSSLParameters(params); |
83 engineClient.setUseClientMode(true); |
93 engineClient.setUseClientMode(true); |
84 completion = new CompletableFuture<>(); |
94 completion = new CompletableFuture<>(); |
85 SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor); |
95 SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor, allBytesReceived); |
86 looper.start(); |
96 looper.start(); |
87 EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion); |
97 EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived); |
88 SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper); |
98 SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper); |
89 // going to measure how long handshake takes |
99 // going to measure how long handshake takes |
90 final long start = System.currentTimeMillis(); |
100 final long start = System.currentTimeMillis(); |
91 sslClient.alpn().whenComplete((String s, Throwable t) -> { |
101 sslClient.alpn().whenComplete((String s, Throwable t) -> { |
92 if (t != null) |
102 if (t != null) |
129 ByteBuffer b = getBuffer(count); |
139 ByteBuffer b = getBuffer(count); |
130 count += LONGS_PER_BUF; |
140 count += LONGS_PER_BUF; |
131 srcPublisher.submit(List.of(b)); |
141 srcPublisher.submit(List.of(b)); |
132 } |
142 } |
133 System.out.println("Finished submission. Waiting for loopback"); |
143 System.out.println("Finished submission. Waiting for loopback"); |
|
144 // make sure we don't wait for allBytesReceived in case of error. |
|
145 completion.whenComplete((r,t) -> allBytesReceived.countDown()); |
|
146 try { |
|
147 allBytesReceived.await(); |
|
148 } catch (InterruptedException e) { |
|
149 throw new RuntimeException(e); |
|
150 } |
|
151 System.out.println("All bytes received: "); |
134 srcPublisher.close(); |
152 srcPublisher.close(); |
135 try { |
153 try { |
136 completion.join(); |
154 completion.join(); |
137 if (!alpn.equals("proto2")) { |
155 if (!alpn.equals("proto2")) { |
138 throw new RuntimeException("wrong alpn received"); |
156 throw new RuntimeException("wrong alpn received"); |
170 private final Socket clientSock; |
188 private final Socket clientSock; |
171 private final SSLSocket serverSock; |
189 private final SSLSocket serverSock; |
172 private final Thread thread1, thread2, thread3; |
190 private final Thread thread1, thread2, thread3; |
173 private volatile Flow.Subscription clientSubscription; |
191 private volatile Flow.Subscription clientSubscription; |
174 private final SubmissionPublisher<List<ByteBuffer>> publisher; |
192 private final SubmissionPublisher<List<ByteBuffer>> publisher; |
175 |
193 private final CountDownLatch allBytesReceived; |
176 SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException { |
194 |
|
195 SSLLoopbackSubscriber(SSLContext ctx, |
|
196 ExecutorService exec, |
|
197 CountDownLatch allBytesReceived) throws IOException { |
177 SSLServerSocketFactory fac = ctx.getServerSocketFactory(); |
198 SSLServerSocketFactory fac = ctx.getServerSocketFactory(); |
178 SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0); |
199 SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0); |
179 SSLParameters params = serv.getSSLParameters(); |
200 SSLParameters params = serv.getSSLParameters(); |
180 params.setApplicationProtocols(new String[]{"proto2"}); |
201 params.setApplicationProtocols(new String[]{"proto2"}); |
181 serv.setSSLParameters(params); |
202 serv.setSSLParameters(params); |
183 |
204 |
184 int serverPort = serv.getLocalPort(); |
205 int serverPort = serv.getLocalPort(); |
185 clientSock = new Socket("127.0.0.1", serverPort); |
206 clientSock = new Socket("127.0.0.1", serverPort); |
186 serverSock = (SSLSocket) serv.accept(); |
207 serverSock = (SSLSocket) serv.accept(); |
187 this.buffer = new LinkedBlockingQueue<>(); |
208 this.buffer = new LinkedBlockingQueue<>(); |
|
209 this.allBytesReceived = allBytesReceived; |
188 thread1 = new Thread(this::clientWriter, "clientWriter"); |
210 thread1 = new Thread(this::clientWriter, "clientWriter"); |
189 thread2 = new Thread(this::serverLoopback, "serverLoopback"); |
211 thread2 = new Thread(this::serverLoopback, "serverLoopback"); |
190 thread3 = new Thread(this::clientReader, "clientReader"); |
212 thread3 = new Thread(this::clientReader, "clientReader"); |
191 publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(), |
213 publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(), |
192 this::handlePublisherException); |
214 this::handlePublisherException); |
216 byte[] buf = new byte[bufsize]; |
238 byte[] buf = new byte[bufsize]; |
217 int n = is.read(buf); |
239 int n = is.read(buf); |
218 if (n == -1) { |
240 if (n == -1) { |
219 System.out.println("clientReader close: read " |
241 System.out.println("clientReader close: read " |
220 + readCount.get() + " bytes"); |
242 + readCount.get() + " bytes"); |
|
243 System.out.println("clientReader: got EOF. " |
|
244 + "Waiting signal to close publisher."); |
|
245 allBytesReceived.await(); |
|
246 System.out.println("clientReader: closing publisher"); |
221 publisher.close(); |
247 publisher.close(); |
222 sleep(2000); |
248 sleep(2000); |
223 Utils.close(is, clientSock); |
249 Utils.close(is, clientSock); |
224 return; |
250 return; |
225 } |
251 } |
359 private final long nbytes; |
385 private final long nbytes; |
360 |
386 |
361 private final AtomicLong counter; |
387 private final AtomicLong counter; |
362 private volatile Flow.Subscription subscription; |
388 private volatile Flow.Subscription subscription; |
363 private final CompletableFuture<Void> completion; |
389 private final CompletableFuture<Void> completion; |
364 |
390 private final CountDownLatch allBytesReceived; |
365 EndSubscriber(long nbytes, CompletableFuture<Void> completion) { |
391 |
|
392 EndSubscriber(long nbytes, |
|
393 CompletableFuture<Void> completion, |
|
394 CountDownLatch allBytesReceived) { |
366 counter = new AtomicLong(0); |
395 counter = new AtomicLong(0); |
367 this.nbytes = nbytes; |
396 this.nbytes = nbytes; |
368 this.completion = completion; |
397 this.completion = completion; |
|
398 this.allBytesReceived = allBytesReceived; |
369 } |
399 } |
370 |
400 |
371 @Override |
401 @Override |
372 public void onSubscribe(Flow.Subscription subscription) { |
402 public void onSubscribe(Flow.Subscription subscription) { |
373 this.subscription = subscription; |
403 this.subscription = subscription; |
406 } |
436 } |
407 } |
437 } |
408 |
438 |
409 counter.set(currval); |
439 counter.set(currval); |
410 subscription.request(1); |
440 subscription.request(1); |
|
441 if (currval >= TOTAL_LONGS) { |
|
442 allBytesReceived.countDown(); |
|
443 } |
411 } |
444 } |
412 |
445 |
413 @Override |
446 @Override |
414 public void onError(Throwable throwable) { |
447 public void onError(Throwable throwable) { |
|
448 allBytesReceived.countDown(); |
415 completion.completeExceptionally(throwable); |
449 completion.completeExceptionally(throwable); |
416 } |
450 } |
417 |
451 |
418 @Override |
452 @Override |
419 public void onComplete() { |
453 public void onComplete() { |
421 if (n != nbytes) { |
455 if (n != nbytes) { |
422 System.out.printf("nbytes=%d n=%d\n", nbytes, n); |
456 System.out.printf("nbytes=%d n=%d\n", nbytes, n); |
423 completion.completeExceptionally(new RuntimeException("ERROR AT END")); |
457 completion.completeExceptionally(new RuntimeException("ERROR AT END")); |
424 } else { |
458 } else { |
425 System.out.println("DONE OK: counter = " + n); |
459 System.out.println("DONE OK: counter = " + n); |
|
460 allBytesReceived.countDown(); |
426 completion.complete(null); |
461 completion.complete(null); |
427 } |
462 } |
428 } |
463 } |
429 } |
464 } |
430 |
465 |