51 import java.util.concurrent.CompletableFuture; |
51 import java.util.concurrent.CompletableFuture; |
52 import java.util.concurrent.CompletionException; |
52 import java.util.concurrent.CompletionException; |
53 import java.util.concurrent.CountDownLatch; |
53 import java.util.concurrent.CountDownLatch; |
54 import java.util.concurrent.Executors; |
54 import java.util.concurrent.Executors; |
55 import java.util.concurrent.ExecutorService; |
55 import java.util.concurrent.ExecutorService; |
|
56 import java.util.concurrent.Semaphore; |
56 import javax.net.ssl.SSLContext; |
57 import javax.net.ssl.SSLContext; |
57 import java.net.http.HttpClient; |
58 import java.net.http.HttpClient; |
58 import java.net.http.HttpRequest; |
59 import java.net.http.HttpRequest; |
59 import java.net.http.HttpResponse; |
60 import java.net.http.HttpResponse; |
60 import java.net.http.HttpResponse.BodyHandler; |
61 import java.net.http.HttpResponse.BodyHandler; |
72 |
73 |
73 public class MaxStreams { |
74 public class MaxStreams { |
74 |
75 |
75 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) |
76 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) |
76 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) |
77 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) |
|
78 final Http2FixedHandler handler = new Http2FixedHandler(); |
77 String http2FixedURI; |
79 String http2FixedURI; |
78 String https2FixedURI; |
80 String https2FixedURI; |
79 volatile CountDownLatch latch; |
81 volatile CountDownLatch latch; |
80 ExecutorService exec; |
82 ExecutorService exec; |
|
83 final Semaphore canStartTestRun = new Semaphore(1); |
81 |
84 |
82 // we send an initial warm up request, then MAX_STREAMS+1 requests |
85 // we send an initial warm up request, then MAX_STREAMS+1 requests |
83 // in parallel. The last of them should hit the limit. |
86 // in parallel. The last of them should hit the limit. |
84 // Then we wait for all the responses and send a further request |
87 // Then we wait for all the responses and send a further request |
85 // which should succeed. The server should see (and respond to) |
88 // which should succeed. The server should see (and respond to) |
99 } |
102 } |
100 |
103 |
101 |
104 |
102 @Test(dataProvider = "uris", timeOut=20000) |
105 @Test(dataProvider = "uris", timeOut=20000) |
103 void testAsString(String uri) throws Exception { |
106 void testAsString(String uri) throws Exception { |
|
107 canStartTestRun.acquire(); |
104 latch = new CountDownLatch(1); |
108 latch = new CountDownLatch(1); |
|
109 handler.setLatch(latch); |
105 HttpClient client = HttpClient.newBuilder().build(); |
110 HttpClient client = HttpClient.newBuilder().build(); |
106 List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>(); |
111 List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>(); |
107 |
112 |
108 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) |
113 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) |
109 .version(HttpClient.Version.HTTP_2) |
114 .version(HttpClient.Version.HTTP_2) |
116 |
121 |
117 for (int i=0;i<MAX_STREAMS+1; i++) { |
122 for (int i=0;i<MAX_STREAMS+1; i++) { |
118 responses.add(client.sendAsync(request, BodyHandlers.ofString())); |
123 responses.add(client.sendAsync(request, BodyHandlers.ofString())); |
119 } |
124 } |
120 |
125 |
121 Thread.sleep(1000); // race possible even with latch |
126 // wait until we get local exception before allow server to proceed |
|
127 try { |
|
128 CompletableFuture.anyOf(responses.toArray(new CompletableFuture<?>[0])).join(); |
|
129 } catch (Exception ee) { |
|
130 System.err.println("Expected exception 1 " + ee); |
|
131 } |
|
132 |
122 latch.countDown(); |
133 latch.countDown(); |
123 |
134 |
124 // check the first MAX_STREAMS requests succeeded |
135 // check the first MAX_STREAMS requests succeeded |
125 try { |
136 try { |
126 CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join(); |
137 CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join(); |
|
138 System.err.println("Did not get Expected exception 2 "); |
127 } catch (Exception ee) { |
139 } catch (Exception ee) { |
128 System.err.println("Expected exception " + ee); |
140 System.err.println("Expected exception 2 " + ee); |
129 } |
141 } |
130 int count = 0; |
142 int count = 0; |
|
143 int failures = 0; |
131 for (CompletableFuture<HttpResponse<String>> cf : responses) { |
144 for (CompletableFuture<HttpResponse<String>> cf : responses) { |
132 HttpResponse<String> r = null; |
145 HttpResponse<String> r = null; |
133 try { |
146 try { |
|
147 count++; |
134 r = cf.join(); |
148 r = cf.join(); |
135 if (r.statusCode() != 200 || !r.body().equals(RESPONSE)) |
149 if (r.statusCode() != 200 || !r.body().equals(RESPONSE)) |
136 throw new RuntimeException(); |
150 throw new RuntimeException(); |
137 } catch (Throwable t) { |
151 } catch (Throwable t) { |
138 System.err.println("Exception at count = " + count); |
152 failures++; |
|
153 System.err.printf("Failure %d at count %d\n", failures, count); |
139 System.err.println(t); |
154 System.err.println(t); |
140 t.printStackTrace(); |
155 t.printStackTrace(); |
141 count++; |
|
142 } |
156 } |
143 } |
157 } |
144 if (count != 1) { |
158 if (failures != 1) { |
145 String msg = "Expected 1 failure. Got " + count; |
159 String msg = "Expected 1 failure. Got " + failures; |
146 throw new RuntimeException(msg); |
160 throw new RuntimeException(msg); |
147 } |
161 } |
148 |
162 |
149 // make sure it succeeds now as number of streams == 0 now |
163 // make sure it succeeds now as number of streams == 0 now |
150 HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString()); |
164 HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString()); |
161 InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); |
175 InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); |
162 |
176 |
163 Properties props = new Properties(); |
177 Properties props = new Properties(); |
164 props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS)); |
178 props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS)); |
165 http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null); |
179 http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null); |
166 http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed"); |
180 http2TestServer.addHandler(handler, "/http2/fixed"); |
167 http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed"; |
181 http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed"; |
168 http2TestServer.start(); |
182 http2TestServer.start(); |
169 |
183 |
170 https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx); |
184 https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx); |
171 https2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed"); |
185 https2TestServer.addHandler(handler, "/http2/fixed"); |
172 https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/http2/fixed"; |
186 https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/http2/fixed"; |
173 https2TestServer.start(); |
187 https2TestServer.start(); |
174 } |
188 } |
175 |
189 |
176 @AfterTest |
190 @AfterTest |
177 public void teardown() throws Exception { |
191 public void teardown() throws Exception { |
|
192 System.err.println("Stopping test server now"); |
178 http2TestServer.stop(); |
193 http2TestServer.stop(); |
179 } |
194 } |
180 |
195 |
181 class Http2FixedHandler implements Http2Handler { |
196 class Http2FixedHandler implements Http2Handler { |
182 volatile AtomicInteger counter = new AtomicInteger(0); |
197 final AtomicInteger counter = new AtomicInteger(0); |
|
198 CountDownLatch latch; |
|
199 |
|
200 synchronized void setLatch(CountDownLatch latch) { |
|
201 this.latch = latch; |
|
202 } |
|
203 |
|
204 synchronized CountDownLatch getLatch() { |
|
205 return latch; |
|
206 } |
183 |
207 |
184 @Override |
208 @Override |
185 public void handle(Http2TestExchange t) throws IOException { |
209 public void handle(Http2TestExchange t) throws IOException { |
|
210 int c = -1; |
186 try (InputStream is = t.getRequestBody(); |
211 try (InputStream is = t.getRequestBody(); |
187 OutputStream os = t.getResponseBody()) { |
212 OutputStream os = t.getResponseBody()) { |
188 |
213 |
189 is.readAllBytes(); |
214 is.readAllBytes(); |
190 int c = counter.getAndIncrement(); |
215 c = counter.getAndIncrement(); |
191 if (c > 0 && c <= MAX_STREAMS) { |
216 if (c > 0 && c <= MAX_STREAMS) { |
192 // Wait for latch. |
217 // Wait for latch. |
193 try { |
218 try { |
194 // don't send any replies until all requests are sent |
219 // don't send any replies until all requests are sent |
195 System.err.println("latch await"); |
220 System.err.println("latch await"); |
196 latch.await(); |
221 getLatch().await(); |
197 System.err.println("latch resume"); |
222 System.err.println("latch resume"); |
198 } catch (InterruptedException ee) {} |
223 } catch (InterruptedException ee) {} |
199 } |
224 } |
200 if (c == MAX_STREAMS + 1) |
|
201 counter = new AtomicInteger(0); |
|
202 t.sendResponseHeaders(200, RESPONSE.length()); |
225 t.sendResponseHeaders(200, RESPONSE.length()); |
203 os.write(RESPONSE.getBytes()); |
226 os.write(RESPONSE.getBytes()); |
|
227 } finally { |
|
228 // client issues MAX_STREAMS + 3 requests in total |
|
229 // but server should only see MAX_STREAMS + 2 in total. One is rejected by client |
|
230 // counter c captured before increment so final value is MAX_STREAMS + 1 |
|
231 if (c == MAX_STREAMS + 1) { |
|
232 counter.set(0); |
|
233 canStartTestRun.release(); |
|
234 } |
204 } |
235 } |
205 } |
236 } |
206 } |
237 } |
207 } |
238 } |