1 /* |
|
2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. |
|
8 * |
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 * version 2 for more details (a copy is included in the LICENSE file that |
|
13 * accompanied this code). |
|
14 * |
|
15 * You should have received a copy of the GNU General Public License version |
|
16 * 2 along with this work; if not, write to the Free Software Foundation, |
|
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 * |
|
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
20 * or visit www.oracle.com if you need additional information or have any |
|
21 * questions. |
|
22 */ |
|
23 |
|
24 package java.net.http.internal.websocket; |
|
25 |
|
26 import java.net.http.WebSocket; |
|
27 import org.testng.annotations.Test; |
|
28 |
|
29 import java.net.URI; |
|
30 import java.nio.ByteBuffer; |
|
31 import java.util.Collection; |
|
32 import java.util.List; |
|
33 import java.util.Random; |
|
34 import java.util.concurrent.CompletableFuture; |
|
35 import java.util.concurrent.TimeUnit; |
|
36 import java.util.concurrent.atomic.AtomicInteger; |
|
37 import java.util.function.Consumer; |
|
38 import java.util.function.Supplier; |
|
39 |
|
40 import static java.net.http.WebSocket.MessagePart.FIRST; |
|
41 import static java.net.http.WebSocket.MessagePart.LAST; |
|
42 import static java.net.http.WebSocket.MessagePart.PART; |
|
43 import static java.net.http.WebSocket.MessagePart.WHOLE; |
|
44 import static java.net.http.WebSocket.NORMAL_CLOSURE; |
|
45 import static java.net.http.internal.websocket.MockListener.Invocation.onClose; |
|
46 import static java.net.http.internal.websocket.MockListener.Invocation.onError; |
|
47 import static java.net.http.internal.websocket.MockListener.Invocation.onOpen; |
|
48 import static java.net.http.internal.websocket.MockListener.Invocation.onPing; |
|
49 import static java.net.http.internal.websocket.MockListener.Invocation.onPong; |
|
50 import static java.net.http.internal.websocket.MockListener.Invocation.onText; |
|
51 import static java.net.http.internal.websocket.MockTransport.onClose; |
|
52 import static java.net.http.internal.websocket.MockTransport.onPing; |
|
53 import static java.net.http.internal.websocket.MockTransport.onPong; |
|
54 import static java.net.http.internal.websocket.MockTransport.onText; |
|
55 import static java.net.http.internal.websocket.TestSupport.assertCompletesExceptionally; |
|
56 import static org.testng.Assert.assertEquals; |
|
57 |
|
58 /* |
|
59 * Formatting in this file may seem strange: |
|
60 * |
|
61 * ( |
|
62 * ( ...) |
|
63 * ... |
|
64 * ) |
|
65 * ... |
|
66 * |
|
67 * However there is a rationale behind it. Sometimes the level of argument |
|
68 * nesting is high, which makes it hard to manage parentheses. |
|
69 */ |
|
70 public class WebSocketImplTest { |
|
71 |
|
72 // TODO: request in onClose/onError |
|
73 // TODO: throw exception in onClose/onError |
|
74 // TODO: exception is thrown from request() |
|
75 // TODO: repeated sendClose complete normally |
|
76 // TODO: default Close message is sent if IAE is thrown from sendClose |
|
77 |
|
78 @Test |
|
79 public void testNonPositiveRequest() throws Exception { |
|
80 MockListener listener = new MockListener(Long.MAX_VALUE) { |
|
81 @Override |
|
82 protected void onOpen0(WebSocket webSocket) { |
|
83 webSocket.request(0); |
|
84 } |
|
85 }; |
|
86 WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE)))); |
|
87 listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS); |
|
88 List<MockListener.Invocation> invocations = listener.invocations(); |
|
89 assertEquals( |
|
90 invocations, |
|
91 List.of( |
|
92 onOpen(ws), |
|
93 onError(ws, IllegalArgumentException.class) |
|
94 ) |
|
95 ); |
|
96 } |
|
97 |
|
98 @Test |
|
99 public void testText1() throws Exception { |
|
100 MockListener listener = new MockListener(Long.MAX_VALUE); |
|
101 WebSocket ws = newInstance( |
|
102 listener, |
|
103 List.of( |
|
104 now(onText("1", FIRST)), |
|
105 now(onText("2", PART)), |
|
106 now(onText("3", LAST)), |
|
107 now(onClose(NORMAL_CLOSURE, "no reason")) |
|
108 ) |
|
109 ); |
|
110 listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS); |
|
111 List<MockListener.Invocation> invocations = listener.invocations(); |
|
112 assertEquals( |
|
113 invocations, |
|
114 List.of( |
|
115 onOpen(ws), |
|
116 onText(ws, "1", FIRST), |
|
117 onText(ws, "2", PART), |
|
118 onText(ws, "3", LAST), |
|
119 onClose(ws, NORMAL_CLOSURE, "no reason") |
|
120 ) |
|
121 ); |
|
122 } |
|
123 |
|
124 @Test |
|
125 public void testText2() throws Exception { |
|
126 MockListener listener = new MockListener(Long.MAX_VALUE); |
|
127 WebSocket ws = newInstance( |
|
128 listener, |
|
129 List.of( |
|
130 now(onText("1", FIRST)), |
|
131 seconds(1, onText("2", PART)), |
|
132 now(onText("3", LAST)), |
|
133 seconds(1, onClose(NORMAL_CLOSURE, "no reason")) |
|
134 ) |
|
135 ); |
|
136 listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS); |
|
137 List<MockListener.Invocation> invocations = listener.invocations(); |
|
138 assertEquals( |
|
139 invocations, |
|
140 List.of( |
|
141 onOpen(ws), |
|
142 onText(ws, "1", FIRST), |
|
143 onText(ws, "2", PART), |
|
144 onText(ws, "3", LAST), |
|
145 onClose(ws, NORMAL_CLOSURE, "no reason") |
|
146 ) |
|
147 ); |
|
148 } |
|
149 |
|
150 @Test |
|
151 public void testTextIntermixedWithPongs() throws Exception { |
|
152 MockListener listener = new MockListener(Long.MAX_VALUE); |
|
153 WebSocket ws = newInstance( |
|
154 listener, |
|
155 List.of( |
|
156 now(onText("1", FIRST)), |
|
157 now(onText("2", PART)), |
|
158 now(onPong(ByteBuffer.allocate(16))), |
|
159 seconds(1, onPong(ByteBuffer.allocate(32))), |
|
160 now(onText("3", LAST)), |
|
161 now(onPong(ByteBuffer.allocate(64))), |
|
162 now(onClose(NORMAL_CLOSURE, "no reason")) |
|
163 ) |
|
164 ); |
|
165 listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS); |
|
166 List<MockListener.Invocation> invocations = listener.invocations(); |
|
167 assertEquals( |
|
168 invocations, |
|
169 List.of( |
|
170 onOpen(ws), |
|
171 onText(ws, "1", FIRST), |
|
172 onText(ws, "2", PART), |
|
173 onPong(ws, ByteBuffer.allocate(16)), |
|
174 onPong(ws, ByteBuffer.allocate(32)), |
|
175 onText(ws, "3", LAST), |
|
176 onPong(ws, ByteBuffer.allocate(64)), |
|
177 onClose(ws, NORMAL_CLOSURE, "no reason") |
|
178 ) |
|
179 ); |
|
180 } |
|
181 |
|
182 @Test |
|
183 public void testTextIntermixedWithPings() throws Exception { |
|
184 MockListener listener = new MockListener(Long.MAX_VALUE); |
|
185 WebSocket ws = newInstance( |
|
186 listener, |
|
187 List.of( |
|
188 now(onText("1", FIRST)), |
|
189 now(onText("2", PART)), |
|
190 now(onPing(ByteBuffer.allocate(16))), |
|
191 seconds(1, onPing(ByteBuffer.allocate(32))), |
|
192 now(onText("3", LAST)), |
|
193 now(onPing(ByteBuffer.allocate(64))), |
|
194 now(onClose(NORMAL_CLOSURE, "no reason")) |
|
195 ) |
|
196 ); |
|
197 listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS); |
|
198 List<MockListener.Invocation> invocations = listener.invocations(); |
|
199 assertEquals( |
|
200 invocations, |
|
201 List.of( |
|
202 onOpen(ws), |
|
203 onText(ws, "1", FIRST), |
|
204 onText(ws, "2", PART), |
|
205 onPing(ws, ByteBuffer.allocate(16)), |
|
206 onPing(ws, ByteBuffer.allocate(32)), |
|
207 onText(ws, "3", LAST), |
|
208 onPing(ws, ByteBuffer.allocate(64)), |
|
209 onClose(ws, NORMAL_CLOSURE, "no reason")) |
|
210 ); |
|
211 } |
|
212 |
|
213 // Tease out "java.lang.IllegalStateException: Send pending" due to possible |
|
214 // race between sending a message and replenishing the permit |
|
215 @Test |
|
216 public void testManyTextMessages() { |
|
217 WebSocketImpl ws = newInstance( |
|
218 new MockListener(1), |
|
219 new TransportFactory() { |
|
220 @Override |
|
221 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
222 MessageStreamConsumer consumer) { |
|
223 |
|
224 final Random r = new Random(); |
|
225 |
|
226 return new MockTransport<>(sendResultSupplier, consumer) { |
|
227 @Override |
|
228 protected CompletableFuture<T> defaultSend() { |
|
229 return millis(r.nextInt(100), result()); |
|
230 } |
|
231 }; |
|
232 } |
|
233 }); |
|
234 int NUM_MESSAGES = 512; |
|
235 CompletableFuture<WebSocket> current = CompletableFuture.completedFuture(ws); |
|
236 for (int i = 0; i < NUM_MESSAGES; i++) { |
|
237 current = current.thenCompose(w -> w.sendText(" ", true)); |
|
238 } |
|
239 current.join(); |
|
240 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
241 assertEquals(transport.invocations().size(), NUM_MESSAGES); |
|
242 } |
|
243 |
|
244 @Test |
|
245 public void testManyBinaryMessages() { |
|
246 WebSocketImpl ws = newInstance( |
|
247 new MockListener(1), |
|
248 new TransportFactory() { |
|
249 @Override |
|
250 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
251 MessageStreamConsumer consumer) { |
|
252 |
|
253 final Random r = new Random(); |
|
254 |
|
255 return new MockTransport<>(sendResultSupplier, consumer) { |
|
256 @Override |
|
257 protected CompletableFuture<T> defaultSend() { |
|
258 return millis(r.nextInt(150), result()); |
|
259 } |
|
260 }; |
|
261 } |
|
262 }); |
|
263 CompletableFuture<WebSocket> start = new CompletableFuture<>(); |
|
264 |
|
265 int NUM_MESSAGES = 512; |
|
266 CompletableFuture<WebSocket> current = start; |
|
267 for (int i = 0; i < NUM_MESSAGES; i++) { |
|
268 current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true)); |
|
269 } |
|
270 |
|
271 start.completeAsync(() -> ws); |
|
272 current.join(); |
|
273 |
|
274 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
275 assertEquals(transport.invocations().size(), NUM_MESSAGES); |
|
276 } |
|
277 |
|
278 |
|
279 @Test |
|
280 public void sendTextImmediately() { |
|
281 WebSocketImpl ws = newInstance( |
|
282 new MockListener(1), |
|
283 new TransportFactory() { |
|
284 @Override |
|
285 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
286 MessageStreamConsumer consumer) { |
|
287 return new MockTransport<>(sendResultSupplier, consumer); |
|
288 } |
|
289 }); |
|
290 CompletableFuture.completedFuture(ws) |
|
291 .thenCompose(w -> w.sendText("1", true)) |
|
292 .thenCompose(w -> w.sendText("2", true)) |
|
293 .thenCompose(w -> w.sendText("3", true)) |
|
294 .join(); |
|
295 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
296 assertEquals(transport.invocations().size(), 3); |
|
297 } |
|
298 |
|
299 @Test |
|
300 public void sendTextWithDelay() { |
|
301 MockListener listener = new MockListener(1); |
|
302 WebSocketImpl ws = newInstance( |
|
303 listener, |
|
304 new TransportFactory() { |
|
305 @Override |
|
306 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
307 MessageStreamConsumer consumer) { |
|
308 return new MockTransport<>(sendResultSupplier, consumer) { |
|
309 @Override |
|
310 protected CompletableFuture<T> defaultSend() { |
|
311 return seconds(1, result()); |
|
312 } |
|
313 }; |
|
314 } |
|
315 }); |
|
316 CompletableFuture.completedFuture(ws) |
|
317 .thenCompose(w -> w.sendText("1", true)) |
|
318 .thenCompose(w -> w.sendText("2", true)) |
|
319 .thenCompose(w -> w.sendText("3", true)) |
|
320 .join(); |
|
321 assertEquals(listener.invocations(), List.of(onOpen(ws))); |
|
322 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
323 assertEquals(transport.invocations().size(), 3); |
|
324 } |
|
325 |
|
326 @Test |
|
327 public void sendTextMixedDelay() { |
|
328 MockListener listener = new MockListener(1); |
|
329 WebSocketImpl ws = newInstance( |
|
330 listener, |
|
331 new TransportFactory() { |
|
332 |
|
333 final Random r = new Random(); |
|
334 |
|
335 @Override |
|
336 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
337 MessageStreamConsumer consumer) { |
|
338 return new MockTransport<>(sendResultSupplier, consumer) { |
|
339 @Override |
|
340 protected CompletableFuture<T> defaultSend() { |
|
341 return r.nextBoolean() |
|
342 ? seconds(1, result()) |
|
343 : now(result()); |
|
344 } |
|
345 }; |
|
346 } |
|
347 }); |
|
348 CompletableFuture.completedFuture(ws) |
|
349 .thenCompose(w -> w.sendText("1", true)) |
|
350 .thenCompose(w -> w.sendText("2", true)) |
|
351 .thenCompose(w -> w.sendText("3", true)) |
|
352 .thenCompose(w -> w.sendText("4", true)) |
|
353 .thenCompose(w -> w.sendText("5", true)) |
|
354 .thenCompose(w -> w.sendText("6", true)) |
|
355 .thenCompose(w -> w.sendText("7", true)) |
|
356 .thenCompose(w -> w.sendText("8", true)) |
|
357 .thenCompose(w -> w.sendText("9", true)) |
|
358 .join(); |
|
359 assertEquals(listener.invocations(), List.of(onOpen(ws))); |
|
360 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
361 assertEquals(transport.invocations().size(), 9); |
|
362 } |
|
363 |
|
364 @Test(enabled = false) // temporarily disabled |
|
365 public void sendControlMessagesConcurrently() { |
|
366 MockListener listener = new MockListener(1); |
|
367 |
|
368 CompletableFuture<?> first = new CompletableFuture<>(); // barrier |
|
369 |
|
370 WebSocketImpl ws = newInstance( |
|
371 listener, |
|
372 new TransportFactory() { |
|
373 |
|
374 final AtomicInteger i = new AtomicInteger(); |
|
375 |
|
376 @Override |
|
377 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
378 MessageStreamConsumer consumer) { |
|
379 return new MockTransport<>(sendResultSupplier, consumer) { |
|
380 @Override |
|
381 protected CompletableFuture<T> defaultSend() { |
|
382 if (i.incrementAndGet() == 1) { |
|
383 return first.thenApply(o -> result()); |
|
384 } else { |
|
385 return now(result()); |
|
386 } |
|
387 } |
|
388 }; |
|
389 } |
|
390 }); |
|
391 |
|
392 CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0)); |
|
393 CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0)); |
|
394 CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, ""); |
|
395 CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, ""); |
|
396 CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0)); |
|
397 CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0)); |
|
398 |
|
399 first.complete(null); |
|
400 // Don't care about exceptional completion, only that all of them have |
|
401 // completed |
|
402 CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6) |
|
403 .handle((v, e) -> null).join(); |
|
404 |
|
405 cf3.join(); /* Check that sendClose has completed normally */ |
|
406 cf4.join(); /* Check that repeated sendClose has completed normally */ |
|
407 assertCompletesExceptionally(IllegalStateException.class, cf5); |
|
408 assertCompletesExceptionally(IllegalStateException.class, cf6); |
|
409 |
|
410 assertEquals(listener.invocations(), List.of(onOpen(ws))); |
|
411 MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport(); |
|
412 assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted |
|
413 } |
|
414 |
|
415 private static <T> CompletableFuture<T> seconds(long val, T result) { |
|
416 return new CompletableFuture<T>() |
|
417 .completeOnTimeout(result, val, TimeUnit.SECONDS); |
|
418 } |
|
419 |
|
420 private static <T> CompletableFuture<T> millis(long val, T result) { |
|
421 return new CompletableFuture<T>() |
|
422 .completeOnTimeout(result, val, TimeUnit.MILLISECONDS); |
|
423 } |
|
424 |
|
425 private static <T> CompletableFuture<T> now(T result) { |
|
426 return CompletableFuture.completedFuture(result); |
|
427 } |
|
428 |
|
429 private static WebSocketImpl newInstance( |
|
430 WebSocket.Listener listener, |
|
431 Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) { |
|
432 TransportFactory factory = new TransportFactory() { |
|
433 @Override |
|
434 public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier, |
|
435 MessageStreamConsumer consumer) { |
|
436 return new MockTransport<T>(sendResultSupplier, consumer) { |
|
437 @Override |
|
438 protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() { |
|
439 return input; |
|
440 } |
|
441 }; |
|
442 } |
|
443 }; |
|
444 return newInstance(listener, factory); |
|
445 } |
|
446 |
|
447 private static WebSocketImpl newInstance(WebSocket.Listener listener, |
|
448 TransportFactory factory) { |
|
449 URI uri = URI.create("ws://localhost"); |
|
450 String subprotocol = ""; |
|
451 return WebSocketImpl.newInstance(uri, subprotocol, listener, factory); |
|
452 } |
|
453 } |
|