author | dfuchs |
Sat, 02 Dec 2017 17:40:57 +0000 | |
branch | http-client-branch |
changeset 55942 | 8d4770c22b63 |
parent 55909 | 583695a0ed6a |
child 55947 | c4f314605d28 |
permissions | -rw-r--r-- |
55763 | 1 |
/* |
2 |
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. |
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 |
* |
|
5 |
* This code is free software; you can redistribute it and/or modify it |
|
6 |
* under the terms of the GNU General Public License version 2 only, as |
|
7 |
* published by the Free Software Foundation. |
|
8 |
* |
|
9 |
* This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 |
* version 2 for more details (a copy is included in the LICENSE file that |
|
13 |
* accompanied this code). |
|
14 |
* |
|
15 |
* You should have received a copy of the GNU General Public License version |
|
16 |
* 2 along with this work; if not, write to the Free Software Foundation, |
|
17 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 |
* |
|
19 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
20 |
* or visit www.oracle.com if you need additional information or have any |
|
21 |
* questions. |
|
22 |
*/ |
|
23 |
||
24 |
package jdk.incubator.http; |
|
25 |
||
26 |
import jdk.incubator.http.internal.common.Demand; |
|
27 |
import jdk.incubator.http.internal.common.FlowTube; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
28 |
import jdk.incubator.http.internal.common.SSLFlowDelegate; |
55763 | 29 |
import jdk.incubator.http.internal.common.SSLTube; |
30 |
import jdk.incubator.http.internal.common.SequentialScheduler; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
31 |
import jdk.incubator.http.internal.common.Utils; |
55763 | 32 |
import org.testng.annotations.Test; |
33 |
||
34 |
import javax.net.ssl.KeyManagerFactory; |
|
35 |
import javax.net.ssl.SSLContext; |
|
36 |
import javax.net.ssl.SSLEngine; |
|
37 |
import javax.net.ssl.SSLParameters; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
38 |
import javax.net.ssl.SSLServerSocket; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
39 |
import javax.net.ssl.SSLServerSocketFactory; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
40 |
import javax.net.ssl.SSLSocket; |
55763 | 41 |
import javax.net.ssl.TrustManagerFactory; |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
42 |
import java.io.BufferedOutputStream; |
55763 | 43 |
import java.io.File; |
44 |
import java.io.FileInputStream; |
|
45 |
import java.io.IOException; |
|
46 |
import java.io.InputStream; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
47 |
import java.io.OutputStream; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
48 |
import java.net.Socket; |
55763 | 49 |
import java.nio.ByteBuffer; |
50 |
import java.security.KeyManagementException; |
|
51 |
import java.security.KeyStore; |
|
52 |
import java.security.KeyStoreException; |
|
53 |
import java.security.NoSuchAlgorithmException; |
|
54 |
import java.security.UnrecoverableKeyException; |
|
55 |
import java.security.cert.CertificateException; |
|
56 |
import java.util.List; |
|
57 |
import java.util.Queue; |
|
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
58 |
import java.util.Random; |
55763 | 59 |
import java.util.StringTokenizer; |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
60 |
import java.util.concurrent.BlockingQueue; |
55763 | 61 |
import java.util.concurrent.CompletableFuture; |
62 |
import java.util.concurrent.ConcurrentLinkedQueue; |
|
63 |
import java.util.concurrent.Executor; |
|
64 |
import java.util.concurrent.ExecutorService; |
|
65 |
import java.util.concurrent.Executors; |
|
66 |
import java.util.concurrent.Flow; |
|
67 |
import java.util.concurrent.ForkJoinPool; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
68 |
import java.util.concurrent.LinkedBlockingQueue; |
55763 | 69 |
import java.util.concurrent.SubmissionPublisher; |
70 |
import java.util.concurrent.atomic.AtomicBoolean; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
71 |
import java.util.concurrent.atomic.AtomicInteger; |
55763 | 72 |
import java.util.concurrent.atomic.AtomicLong; |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
73 |
import java.util.concurrent.atomic.AtomicReference; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
74 |
import java.util.function.Consumer; |
55763 | 75 |
|
76 |
@Test |
|
77 |
public class SSLTubeTest { |
|
78 |
||
79 |
private static final long COUNTER = 600; |
|
80 |
private static final int LONGS_PER_BUF = 800; |
|
81 |
private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; |
|
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
82 |
public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
83 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
84 |
static final Random rand = new Random(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
85 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
86 |
static int randomRange(int lower, int upper) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
87 |
if (lower > upper) |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
88 |
throw new IllegalArgumentException("lower > upper"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
89 |
int diff = upper - lower; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
90 |
int r = lower + rand.nextInt(diff); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
91 |
return r - (r % 8); // round down to multiple of 8 (align for longs) |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
92 |
} |
55763 | 93 |
|
94 |
private static ByteBuffer getBuffer(long startingAt) { |
|
95 |
ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); |
|
96 |
for (int j = 0; j < LONGS_PER_BUF; j++) { |
|
97 |
buf.putLong(startingAt++); |
|
98 |
} |
|
99 |
buf.flip(); |
|
100 |
return buf; |
|
101 |
} |
|
102 |
||
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
103 |
@Test |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
104 |
public void runWithSSLLoopackServer() throws IOException { |
55763 | 105 |
ExecutorService sslExecutor = Executors.newCachedThreadPool(); |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
106 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
107 |
/* Start of wiring */ |
55763 | 108 |
/* Emulates an echo server */ |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
109 |
SSLLoopbackSubscriber server = |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
110 |
new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
111 |
server.start(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
112 |
|
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
113 |
run(server, sslExecutor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
114 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
115 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
116 |
@Test |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
117 |
public void runWithEchoServer() throws IOException { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
118 |
ExecutorService sslExecutor = Executors.newCachedThreadPool(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
119 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
120 |
/* Start of wiring */ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
121 |
/* Emulates an echo server */ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
122 |
FlowTube server = crossOverEchoServer(sslExecutor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
123 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
124 |
run(server, sslExecutor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
125 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
126 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
127 |
private void run(FlowTube server, ExecutorService sslExecutor) throws IOException { |
55763 | 128 |
FlowTube client = new SSLTube(createSSLEngine(true), |
129 |
sslExecutor, |
|
130 |
server); |
|
131 |
SubmissionPublisher<List<ByteBuffer>> p = |
|
132 |
new SubmissionPublisher<>(ForkJoinPool.commonPool(), |
|
133 |
Integer.MAX_VALUE); |
|
134 |
FlowTube.TubePublisher begin = p::subscribe; |
|
135 |
CompletableFuture<Void> completion = new CompletableFuture<>(); |
|
136 |
EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion); |
|
137 |
client.connectFlows(begin, end); |
|
138 |
/* End of wiring */ |
|
139 |
||
140 |
long count = 0; |
|
141 |
System.out.printf("Submitting %d buffer arrays\n", COUNTER); |
|
142 |
System.out.printf("LoopCount should be %d\n", TOTAL_LONGS); |
|
143 |
for (long i = 0; i < COUNTER; i++) { |
|
144 |
ByteBuffer b = getBuffer(count); |
|
145 |
count += LONGS_PER_BUF; |
|
146 |
p.submit(List.of(b)); |
|
147 |
} |
|
148 |
System.out.println("Finished submission. Waiting for loopback"); |
|
149 |
p.close(); |
|
150 |
try { |
|
151 |
completion.join(); |
|
152 |
System.out.println("OK"); |
|
153 |
} finally { |
|
154 |
sslExecutor.shutdownNow(); |
|
155 |
} |
|
156 |
} |
|
157 |
||
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
158 |
/** |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
159 |
* This is a copy of the SSLLoopbackSubscriber used in FlowTest |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
160 |
*/ |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
161 |
static class SSLLoopbackSubscriber implements FlowTube { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
162 |
private final BlockingQueue<ByteBuffer> buffer; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
163 |
private final Socket clientSock; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
164 |
private final SSLSocket serverSock; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
165 |
private final Thread thread1, thread2, thread3; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
166 |
private volatile Flow.Subscription clientSubscription; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
167 |
private final SubmissionPublisher<List<ByteBuffer>> publisher; |
55763 | 168 |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
169 |
SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
170 |
SSLServerSocketFactory fac = ctx.getServerSocketFactory(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
171 |
SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
172 |
SSLParameters params = serv.getSSLParameters(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
173 |
params.setApplicationProtocols(new String[]{"proto2"}); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
174 |
serv.setSSLParameters(params); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
175 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
176 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
177 |
int serverPort = serv.getLocalPort(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
178 |
clientSock = new Socket("127.0.0.1", serverPort); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
179 |
serverSock = (SSLSocket) serv.accept(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
180 |
this.buffer = new LinkedBlockingQueue<>(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
181 |
thread1 = new Thread(this::clientWriter, "clientWriter"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
182 |
thread2 = new Thread(this::serverLoopback, "serverLoopback"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
183 |
thread3 = new Thread(this::clientReader, "clientReader"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
184 |
publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(), |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
185 |
this::handlePublisherException); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
186 |
SSLFlowDelegate.Monitor.add(this::monitor); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
187 |
} |
55763 | 188 |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
189 |
public void start() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
190 |
thread1.start(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
191 |
thread2.start(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
192 |
thread3.start(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
193 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
194 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
195 |
private void handlePublisherException(Object o, Throwable t) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
196 |
System.out.println("Loopback Publisher exception"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
197 |
t.printStackTrace(System.out); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
198 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
199 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
200 |
private final AtomicInteger readCount = new AtomicInteger(); |
55763 | 201 |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
202 |
// reads off the SSLSocket the data from the "server" |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
203 |
private void clientReader() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
204 |
try { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
205 |
InputStream is = clientSock.getInputStream(); |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
206 |
final int bufsize = randomRange(512, 16 * 1024); |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
207 |
System.out.println("clientReader: bufsize = " + bufsize); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
208 |
while (true) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
209 |
byte[] buf = new byte[bufsize]; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
210 |
int n = is.read(buf); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
211 |
if (n == -1) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
212 |
System.out.println("clientReader close: read " |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
213 |
+ readCount.get() + " bytes"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
214 |
publisher.close(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
215 |
sleep(2000); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
216 |
Utils.close(is, clientSock); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
217 |
return; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
218 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
219 |
ByteBuffer bb = ByteBuffer.wrap(buf, 0, n); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
220 |
readCount.addAndGet(n); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
221 |
publisher.submit(List.of(bb)); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
222 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
223 |
} catch (Throwable e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
224 |
e.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
225 |
Utils.close(clientSock); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
226 |
} |
55763 | 227 |
} |
228 |
||
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
229 |
// writes the encrypted data from SSLFLowDelegate to the j.n.Socket |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
230 |
// which is connected to the SSLSocket emulating a server. |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
231 |
private void clientWriter() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
232 |
long nbytes = 0; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
233 |
try { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
234 |
OutputStream os = |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
235 |
new BufferedOutputStream(clientSock.getOutputStream()); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
236 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
237 |
while (true) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
238 |
ByteBuffer buf = buffer.take(); |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
239 |
if (buf == SENTINEL) { |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
240 |
// finished |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
241 |
//Utils.sleep(2000); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
242 |
System.out.println("clientWriter close: " + nbytes + " written"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
243 |
clientSock.shutdownOutput(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
244 |
System.out.println("clientWriter close return"); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
245 |
return; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
246 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
247 |
int len = buf.remaining(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
248 |
int written = writeToStream(os, buf); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
249 |
assert len == written; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
250 |
nbytes += len; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
251 |
assert !buf.hasRemaining() |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
252 |
: "buffer has " + buf.remaining() + " bytes left"; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
253 |
clientSubscription.request(1); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
254 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
255 |
} catch (Throwable e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
256 |
e.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
257 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
258 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
259 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
260 |
private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
261 |
byte[] b = buf.array(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
262 |
int offset = buf.arrayOffset() + buf.position(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
263 |
int n = buf.limit() - buf.position(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
264 |
os.write(b, offset, n); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
265 |
buf.position(buf.limit()); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
266 |
os.flush(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
267 |
return n; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
268 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
269 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
270 |
private final AtomicInteger loopCount = new AtomicInteger(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
271 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
272 |
public String monitor() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
273 |
return "serverLoopback: loopcount = " + loopCount.toString() |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
274 |
+ " clientRead: count = " + readCount.toString(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
275 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
276 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
277 |
// thread2 |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
278 |
private void serverLoopback() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
279 |
try { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
280 |
InputStream is = serverSock.getInputStream(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
281 |
OutputStream os = serverSock.getOutputStream(); |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
282 |
final int bufsize = randomRange(512, 16 * 1024); |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
283 |
System.out.println("serverLoopback: bufsize = " + bufsize); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
284 |
byte[] bb = new byte[bufsize]; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
285 |
while (true) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
286 |
int n = is.read(bb); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
287 |
if (n == -1) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
288 |
sleep(2000); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
289 |
is.close(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
290 |
os.close(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
291 |
serverSock.close(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
292 |
return; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
293 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
294 |
os.write(bb, 0, n); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
295 |
os.flush(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
296 |
loopCount.addAndGet(n); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
297 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
298 |
} catch (Throwable e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
299 |
e.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
300 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
301 |
} |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
302 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
303 |
|
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
304 |
/** |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
305 |
* This needs to be called before the chain is subscribed. It can't be |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
306 |
* supplied in the constructor. |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
307 |
*/ |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
308 |
public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
309 |
publisher.subscribe(returnSubscriber); |
55763 | 310 |
} |
311 |
||
312 |
@Override |
|
313 |
public void onSubscribe(Flow.Subscription subscription) { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
314 |
clientSubscription = subscription; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
315 |
clientSubscription.request(5); |
55763 | 316 |
} |
317 |
||
318 |
@Override |
|
319 |
public void onNext(List<ByteBuffer> item) { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
320 |
try { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
321 |
for (ByteBuffer b : item) |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
322 |
buffer.put(b); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
323 |
} catch (InterruptedException e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
324 |
e.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
325 |
Utils.close(clientSock); |
55763 | 326 |
} |
327 |
} |
|
328 |
||
329 |
@Override |
|
330 |
public void onError(Throwable throwable) { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
331 |
throwable.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
332 |
Utils.close(clientSock); |
55763 | 333 |
} |
334 |
||
335 |
@Override |
|
336 |
public void onComplete() { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
337 |
try { |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
338 |
buffer.put(SENTINEL); |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
339 |
} catch (InterruptedException e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
340 |
e.printStackTrace(); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
341 |
Utils.close(clientSock); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
342 |
} |
55763 | 343 |
} |
344 |
||
345 |
@Override |
|
346 |
public boolean isFinished() { |
|
347 |
return false; |
|
348 |
} |
|
349 |
||
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
350 |
@Override |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
351 |
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
352 |
publisher.subscribe(subscriber); |
55763 | 353 |
} |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
354 |
} |
55763 | 355 |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
356 |
private static void sleep(long millis) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
357 |
try { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
358 |
Thread.sleep(millis); |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
359 |
} catch (InterruptedException e) { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
360 |
|
55763 | 361 |
} |
362 |
} |
|
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
363 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
364 |
/** |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
365 |
* Creates a cross-over FlowTube than can be plugged into a client-side |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
366 |
* SSLTube (in place of the SSLLoopbackSubscriber). |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
367 |
* Note that the only method that can be called on the return tube |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
368 |
* is connectFlows(). Calling any other method will trigger an |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
369 |
* InternalError. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
370 |
* @param sslExecutor an executor |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
371 |
* @return a cross-over FlowTube connected to an EchoTube. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
372 |
* @throws IOException |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
373 |
*/ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
374 |
FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
375 |
LateBindingTube crossOver = new LateBindingTube(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
376 |
FlowTube server = new SSLTube(createSSLEngine(false), |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
377 |
sslExecutor, |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
378 |
crossOver); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
379 |
EchoTube echo = new EchoTube(6); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
380 |
server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
381 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
382 |
return new CrossOverTube(crossOver); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
383 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
384 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
385 |
/** |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
386 |
* A cross-over FlowTube that makes it possible to reverse the direction |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
387 |
* of flows. The typical usage is to connect an two opposite SSLTube, |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
388 |
* one encrypting, one decrypting, to e.g. an EchoTube, with the help |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
389 |
* of a LateBindingTube: |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
390 |
* {@code |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
391 |
* client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
392 |
* } |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
393 |
* <p> |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
394 |
* Note that the only method that can be called on the CrossOverTube is |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
395 |
* connectFlows(). Calling any other method will cause an InternalError to |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
396 |
* be thrown. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
397 |
* Also connectFlows() can be called only once. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
398 |
*/ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
399 |
private static final class CrossOverTube implements FlowTube { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
400 |
final LateBindingTube tube; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
401 |
CrossOverTube(LateBindingTube tube) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
402 |
this.tube = tube; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
403 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
404 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
405 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
406 |
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
407 |
throw newInternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
408 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
409 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
410 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
411 |
public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
412 |
tube.start(writePublisher, readSubscriber); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
413 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
414 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
415 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
416 |
public boolean isFinished() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
417 |
return tube.isFinished(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
418 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
419 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
420 |
Error newInternalError() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
421 |
InternalError error = new InternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
422 |
error.printStackTrace(System.out); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
423 |
return error; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
424 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
425 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
426 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
427 |
public void onSubscribe(Flow.Subscription subscription) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
428 |
throw newInternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
429 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
430 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
431 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
432 |
public void onError(Throwable throwable) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
433 |
throw newInternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
434 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
435 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
436 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
437 |
public void onComplete() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
438 |
throw newInternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
439 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
440 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
441 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
442 |
public void onNext(List<ByteBuffer> item) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
443 |
throw newInternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
444 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
445 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
446 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
447 |
/** |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
448 |
* A late binding tube that makes it possible to create an |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
449 |
* SSLTube before the right-hand-side tube has been created. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
450 |
* The typical usage is to make it possible to connect two |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
451 |
* opposite SSLTube (one encrypting, one decrypting) through a |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
452 |
* CrossOverTube: |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
453 |
* {@code |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
454 |
* client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
455 |
* } |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
456 |
* <p> |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
457 |
* Note that this class only supports a single call to start(): it cannot be |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
458 |
* subscribed more than once from its left-hand-side (the cross over tube side). |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
459 |
*/ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
460 |
private static class LateBindingTube implements FlowTube { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
461 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
462 |
final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
463 |
= new CompletableFuture<>(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
464 |
final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
465 |
= new ConcurrentLinkedQueue<>(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
466 |
AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
467 |
SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
468 |
AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
469 |
private volatile boolean finished; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
470 |
private volatile boolean completed; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
471 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
472 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
473 |
public void start(Flow.Publisher<List<ByteBuffer>> publisher, |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
474 |
Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
475 |
subscriberRef.set(subscriber); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
476 |
futurePublisher.complete(publisher); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
477 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
478 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
479 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
480 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
481 |
public boolean isFinished() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
482 |
return finished; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
483 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
484 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
485 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
486 |
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
487 |
futurePublisher.thenAccept((p) -> p.subscribe(subscriber)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
488 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
489 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
490 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
491 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
492 |
public void onSubscribe(Flow.Subscription subscription) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
493 |
queue.add((s) -> s.onSubscribe(subscription)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
494 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
495 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
496 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
497 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
498 |
public void onNext(List<ByteBuffer> item) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
499 |
queue.add((s) -> s.onNext(item)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
500 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
501 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
502 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
503 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
504 |
public void onError(Throwable throwable) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
505 |
System.out.println("LateBindingTube onError"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
506 |
throwable.printStackTrace(System.out); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
507 |
queue.add((s) -> { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
508 |
errorRef.compareAndSet(null, throwable); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
509 |
try { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
510 |
System.out.println("LateBindingTube subscriber onError: " + throwable); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
511 |
s.onError(errorRef.get()); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
512 |
} finally { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
513 |
finished = true; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
514 |
System.out.println("LateBindingTube finished"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
515 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
516 |
}); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
517 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
518 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
519 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
520 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
521 |
public void onComplete() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
522 |
System.out.println("LateBindingTube completing"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
523 |
queue.add((s) -> { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
524 |
completed = true; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
525 |
try { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
526 |
System.out.println("LateBindingTube complete subscriber"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
527 |
s.onComplete(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
528 |
} finally { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
529 |
finished = true; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
530 |
System.out.println("LateBindingTube finished"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
531 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
532 |
}); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
533 |
scheduler.runOrSchedule(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
534 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
535 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
536 |
private void loop() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
537 |
if (finished) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
538 |
scheduler.stop(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
539 |
return; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
540 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
541 |
Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
542 |
if (subscriber == null) return; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
543 |
try { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
544 |
Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
545 |
while ((s = queue.poll()) != null) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
546 |
s.accept(subscriber); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
547 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
548 |
} catch (Throwable t) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
549 |
if (errorRef.compareAndSet(null, t)) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
550 |
onError(t); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
551 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
552 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
553 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
554 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
555 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
556 |
/** |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
557 |
* An echo tube that just echoes back whatever bytes it receives. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
558 |
* This cannot be plugged to the right-hand-side of an SSLTube |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
559 |
* since handshake data cannot be simply echoed back, and |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
560 |
* application data most likely also need to be decrypted and |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
561 |
* re-encrypted. |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
562 |
*/ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
563 |
private static final class EchoTube implements FlowTube { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
564 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
565 |
private final static Object EOF = new Object(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
566 |
private final Executor executor = Executors.newSingleThreadExecutor(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
567 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
568 |
private final Queue<Object> queue = new ConcurrentLinkedQueue<>(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
569 |
private final int maxQueueSize; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
570 |
private final SequentialScheduler processingScheduler = |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
571 |
new SequentialScheduler(createProcessingTask()); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
572 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
573 |
/* Writing into this tube */ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
574 |
private volatile long requested; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
575 |
private Flow.Subscription subscription; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
576 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
577 |
/* Reading from this tube */ |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
578 |
private final Demand demand = new Demand(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
579 |
private final AtomicBoolean cancelled = new AtomicBoolean(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
580 |
private Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
581 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
582 |
private EchoTube(int maxBufferSize) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
583 |
if (maxBufferSize < 1) |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
584 |
throw new IllegalArgumentException(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
585 |
this.maxQueueSize = maxBufferSize; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
586 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
587 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
588 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
589 |
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
590 |
this.subscriber = subscriber; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
591 |
System.out.println("EchoTube got subscriber: " + subscriber); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
592 |
this.subscriber.onSubscribe(new InternalSubscription()); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
593 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
594 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
595 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
596 |
public void onSubscribe(Flow.Subscription subscription) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
597 |
System.out.println("EchoTube request: " + maxQueueSize); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
598 |
(this.subscription = subscription).request(requested = maxQueueSize); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
599 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
600 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
601 |
private void requestMore() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
602 |
Flow.Subscription s = subscription; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
603 |
if (s == null || cancelled.get()) return; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
604 |
long unfulfilled = queue.size() + --requested; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
605 |
if (unfulfilled <= maxQueueSize/2) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
606 |
long req = maxQueueSize - unfulfilled; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
607 |
requested += req; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
608 |
s.request(req); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
609 |
System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n", |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
610 |
req, requested-req, queue.size(), unfulfilled ); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
611 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
612 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
613 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
614 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
615 |
public void onNext(List<ByteBuffer> item) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
616 |
System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
617 |
Utils.remaining(item), requested, queue.size()); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
618 |
queue.add(item); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
619 |
processingScheduler.deferOrSchedule(executor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
620 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
621 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
622 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
623 |
public void onError(Throwable throwable) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
624 |
System.out.println("EchoTube add " + throwable); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
625 |
queue.add(throwable); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
626 |
processingScheduler.deferOrSchedule(executor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
627 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
628 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
629 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
630 |
public void onComplete() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
631 |
System.out.println("EchoTube add EOF"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
632 |
queue.add(EOF); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
633 |
processingScheduler.deferOrSchedule(executor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
634 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
635 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
636 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
637 |
public boolean isFinished() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
638 |
return cancelled.get(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
639 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
640 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
641 |
private class InternalSubscription implements Flow.Subscription { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
642 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
643 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
644 |
public void request(long n) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
645 |
System.out.println("EchoTube got request: " + n); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
646 |
if (n <= 0) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
647 |
throw new InternalError(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
648 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
649 |
if (demand.increase(n)) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
650 |
processingScheduler.deferOrSchedule(executor); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
651 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
652 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
653 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
654 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
655 |
public void cancel() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
656 |
cancelled.set(true); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
657 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
658 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
659 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
660 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
661 |
public String toString() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
662 |
return "EchoTube"; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
663 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
664 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
665 |
int transmitted = 0; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
666 |
private SequentialScheduler.RestartableTask createProcessingTask() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
667 |
return new SequentialScheduler.CompleteRestartableTask() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
668 |
|
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
669 |
@Override |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
670 |
protected void run() { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
671 |
try { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
672 |
while (!cancelled.get()) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
673 |
Object item = queue.peek(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
674 |
if (item == null) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
675 |
System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n", |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
676 |
requested, demand.get(), transmitted); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
677 |
requestMore(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
678 |
return; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
679 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
680 |
try { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
681 |
System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n", |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
682 |
requested, demand.get(), transmitted); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
683 |
if (item instanceof List) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
684 |
if (!demand.tryDecrement()) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
685 |
System.out.println("EchoTube no demand"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
686 |
return; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
687 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
688 |
@SuppressWarnings("unchecked") |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
689 |
List<ByteBuffer> bytes = (List<ByteBuffer>) item; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
690 |
Object removed = queue.remove(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
691 |
assert removed == item; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
692 |
System.out.println("EchoTube processing " |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
693 |
+ Utils.remaining(bytes)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
694 |
transmitted++; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
695 |
subscriber.onNext(bytes); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
696 |
requestMore(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
697 |
} else if (item instanceof Throwable) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
698 |
cancelled.set(true); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
699 |
Object removed = queue.remove(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
700 |
assert removed == item; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
701 |
System.out.println("EchoTube processing " + item); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
702 |
subscriber.onError((Throwable) item); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
703 |
} else if (item == EOF) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
704 |
cancelled.set(true); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
705 |
Object removed = queue.remove(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
706 |
assert removed == item; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
707 |
System.out.println("EchoTube processing EOF"); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
708 |
subscriber.onComplete(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
709 |
} else { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
710 |
throw new InternalError(String.valueOf(item)); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
711 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
712 |
} finally { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
713 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
714 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
715 |
} catch(Throwable t) { |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
716 |
t.printStackTrace(); |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
717 |
throw t; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
718 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
719 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
720 |
}; |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
721 |
} |
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
722 |
} |
55763 | 723 |
|
724 |
/** |
|
725 |
* The final subscriber which receives the decrypted looped-back data. Just |
|
726 |
* needs to compare the data with what was sent. The given CF is either |
|
727 |
* completed exceptionally with an error or normally on success. |
|
728 |
*/ |
|
729 |
private static class EndSubscriber implements FlowTube.TubeSubscriber { |
|
730 |
||
731 |
private static final int REQUEST_WINDOW = 13; |
|
732 |
||
733 |
private final long nbytes; |
|
734 |
private final AtomicLong counter = new AtomicLong(); |
|
735 |
private final CompletableFuture<?> completion; |
|
736 |
private volatile Flow.Subscription subscription; |
|
737 |
private long unfulfilled; |
|
738 |
||
739 |
EndSubscriber(long nbytes, CompletableFuture<?> completion) { |
|
740 |
this.nbytes = nbytes; |
|
741 |
this.completion = completion; |
|
742 |
} |
|
743 |
||
744 |
@Override |
|
745 |
public void onSubscribe(Flow.Subscription subscription) { |
|
746 |
this.subscription = subscription; |
|
747 |
unfulfilled = REQUEST_WINDOW; |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
748 |
System.out.println("EndSubscriber request " + REQUEST_WINDOW); |
55763 | 749 |
subscription.request(REQUEST_WINDOW); |
750 |
} |
|
751 |
||
752 |
public static String info(List<ByteBuffer> i) { |
|
753 |
StringBuilder sb = new StringBuilder(); |
|
754 |
sb.append("size: ").append(Integer.toString(i.size())); |
|
755 |
int x = 0; |
|
756 |
for (ByteBuffer b : i) |
|
757 |
x += b.remaining(); |
|
758 |
sb.append(" bytes: ").append(x); |
|
759 |
return sb.toString(); |
|
760 |
} |
|
761 |
||
762 |
@Override |
|
763 |
public void onNext(List<ByteBuffer> buffers) { |
|
764 |
if (--unfulfilled == (REQUEST_WINDOW / 2)) { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
765 |
long req = REQUEST_WINDOW - unfulfilled; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
766 |
System.out.println("EndSubscriber request " + req); |
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
767 |
unfulfilled = REQUEST_WINDOW; |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
768 |
subscription.request(req); |
55763 | 769 |
} |
770 |
||
771 |
long currval = counter.get(); |
|
772 |
if (currval % 500 == 0) { |
|
55942
8d4770c22b63
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
dfuchs
parents:
55909
diff
changeset
|
773 |
System.out.println("EndSubscriber: " + currval); |
55763 | 774 |
} |
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
775 |
System.out.println("EndSubscriber onNext " + Utils.remaining(buffers)); |
55763 | 776 |
|
777 |
for (ByteBuffer buf : buffers) { |
|
778 |
while (buf.hasRemaining()) { |
|
779 |
long n = buf.getLong(); |
|
780 |
if (currval > (SSLTubeTest.TOTAL_LONGS - 50)) { |
|
781 |
System.out.println("End: " + currval); |
|
782 |
} |
|
783 |
if (n != currval++) { |
|
784 |
System.out.println("ERROR at " + n + " != " + (currval - 1)); |
|
785 |
completion.completeExceptionally(new RuntimeException("ERROR")); |
|
786 |
subscription.cancel(); |
|
787 |
return; |
|
788 |
} |
|
789 |
} |
|
790 |
} |
|
791 |
||
792 |
counter.set(currval); |
|
793 |
} |
|
794 |
||
795 |
@Override |
|
796 |
public void onError(Throwable throwable) { |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
797 |
System.out.println("EndSubscriber onError " + throwable); |
55763 | 798 |
completion.completeExceptionally(throwable); |
799 |
} |
|
800 |
||
801 |
@Override |
|
802 |
public void onComplete() { |
|
803 |
long n = counter.get(); |
|
804 |
if (n != nbytes) { |
|
805 |
System.out.printf("nbytes=%d n=%d\n", nbytes, n); |
|
806 |
completion.completeExceptionally(new RuntimeException("ERROR AT END")); |
|
807 |
} else { |
|
808 |
System.out.println("DONE OK"); |
|
809 |
completion.complete(null); |
|
810 |
} |
|
811 |
} |
|
55909
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
812 |
@Override |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
813 |
public String toString() { |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
814 |
return "EndSubscriber"; |
583695a0ed6a
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
dfuchs
parents:
55763
diff
changeset
|
815 |
} |
55763 | 816 |
} |
817 |
||
818 |
private static SSLEngine createSSLEngine(boolean client) throws IOException { |
|
819 |
SSLContext context = (new SimpleSSLContext()).get(); |
|
820 |
SSLEngine engine = context.createSSLEngine(); |
|
821 |
SSLParameters params = context.getSupportedSSLParameters(); |
|
822 |
params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl |
|
823 |
if (client) { |
|
824 |
params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2 |
|
825 |
} else { |
|
826 |
params.setApplicationProtocols(new String[]{"proto2"}); // server will choose proto2 |
|
827 |
} |
|
828 |
engine.setSSLParameters(params); |
|
829 |
engine.setUseClientMode(client); |
|
830 |
return engine; |
|
831 |
} |
|
832 |
||
833 |
/** |
|
834 |
* Creates a simple usable SSLContext for SSLSocketFactory or a HttpsServer |
|
835 |
* using either a given keystore or a default one in the test tree. |
|
836 |
* |
|
837 |
* Using this class with a security manager requires the following |
|
838 |
* permissions to be granted: |
|
839 |
* |
|
840 |
* permission "java.util.PropertyPermission" "test.src.path", "read"; |
|
841 |
* permission java.io.FilePermission "${test.src}/../../../../lib/testlibrary/jdk/testlibrary/testkeys", |
|
842 |
* "read"; The exact path above depends on the location of the test. |
|
843 |
*/ |
|
844 |
private static class SimpleSSLContext { |
|
845 |
||
846 |
private final SSLContext ssl; |
|
847 |
||
848 |
/** |
|
849 |
* Loads default keystore from SimpleSSLContext source directory |
|
850 |
*/ |
|
851 |
public SimpleSSLContext() throws IOException { |
|
852 |
String paths = System.getProperty("test.src.path"); |
|
853 |
StringTokenizer st = new StringTokenizer(paths, File.pathSeparator); |
|
854 |
boolean securityExceptions = false; |
|
855 |
SSLContext sslContext = null; |
|
856 |
while (st.hasMoreTokens()) { |
|
857 |
String path = st.nextToken(); |
|
858 |
try { |
|
859 |
File f = new File(path, "../../../../lib/testlibrary/jdk/testlibrary/testkeys"); |
|
860 |
if (f.exists()) { |
|
861 |
try (FileInputStream fis = new FileInputStream(f)) { |
|
862 |
sslContext = init(fis); |
|
863 |
break; |
|
864 |
} |
|
865 |
} |
|
866 |
} catch (SecurityException e) { |
|
867 |
// catch and ignore because permission only required |
|
868 |
// for one entry on path (at most) |
|
869 |
securityExceptions = true; |
|
870 |
} |
|
871 |
} |
|
872 |
if (securityExceptions) { |
|
873 |
System.err.println("SecurityExceptions thrown on loading testkeys"); |
|
874 |
} |
|
875 |
ssl = sslContext; |
|
876 |
} |
|
877 |
||
878 |
private SSLContext init(InputStream i) throws IOException { |
|
879 |
try { |
|
880 |
char[] passphrase = "passphrase".toCharArray(); |
|
881 |
KeyStore ks = KeyStore.getInstance("JKS"); |
|
882 |
ks.load(i, passphrase); |
|
883 |
||
884 |
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); |
|
885 |
kmf.init(ks, passphrase); |
|
886 |
||
887 |
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); |
|
888 |
tmf.init(ks); |
|
889 |
||
890 |
SSLContext ssl = SSLContext.getInstance("TLS"); |
|
891 |
ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); |
|
892 |
return ssl; |
|
893 |
} catch (KeyManagementException | KeyStoreException | |
|
894 |
UnrecoverableKeyException | CertificateException | |
|
895 |
NoSuchAlgorithmException e) { |
|
896 |
throw new RuntimeException(e.getMessage()); |
|
897 |
} |
|
898 |
} |
|
899 |
||
900 |
public SSLContext get() { |
|
901 |
return ssl; |
|
902 |
} |
|
903 |
} |
|
904 |
} |