33 import java.io.IOException; |
33 import java.io.IOException; |
34 import java.nio.ByteBuffer; |
34 import java.nio.ByteBuffer; |
35 import java.nio.CharBuffer; |
35 import java.nio.CharBuffer; |
36 import java.nio.channels.SocketChannel; |
36 import java.nio.channels.SocketChannel; |
37 import java.nio.charset.StandardCharsets; |
37 import java.nio.charset.StandardCharsets; |
|
38 import java.util.ArrayList; |
|
39 import java.util.List; |
38 import java.util.concurrent.CompletableFuture; |
40 import java.util.concurrent.CompletableFuture; |
39 import java.util.concurrent.CompletionException; |
41 import java.util.concurrent.CompletionException; |
40 import java.util.concurrent.CompletionStage; |
42 import java.util.concurrent.CompletionStage; |
41 import java.util.concurrent.TimeUnit; |
43 import java.util.concurrent.TimeUnit; |
42 import java.util.concurrent.TimeoutException; |
44 import java.util.concurrent.TimeoutException; |
130 // the first request |
132 // the first request |
131 try { |
133 try { |
132 messageReceived.get(10, TimeUnit.SECONDS); |
134 messageReceived.get(10, TimeUnit.SECONDS); |
133 fail(); |
135 fail(); |
134 } catch (TimeoutException expected) { } |
136 } catch (TimeoutException expected) { } |
|
137 // TODO: No send operations MUST succeed |
|
138 // assertCompletesExceptionally(IOE, ws.sendText("text!", false)); |
|
139 // assertCompletesExceptionally(IOE, ws.sendText("text!", true)); |
|
140 // assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false)); |
|
141 // assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true)); |
|
142 // assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16))); |
|
143 // assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16))); |
|
144 // assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason")); |
135 } |
145 } |
136 } |
146 } |
137 |
147 |
138 @Test |
148 @Test |
139 public void testNull() throws IOException { |
149 public void testNull() throws IOException { |
515 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124))); |
525 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124))); |
516 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1))); |
526 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1))); |
517 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0))); |
527 assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0))); |
518 } |
528 } |
519 } |
529 } |
|
530 |
|
531 @Test |
|
532 public void aggregatingMessages() throws IOException { |
|
533 |
|
534 List<String> expected = List.of("alpha", "beta", "gamma", "delta"); |
|
535 |
|
536 int[] binary = new int[]{ |
|
537 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha" |
|
538 0x01, 0x02, 0x62, 0x65, // "be |
|
539 0x80, 0x02, 0x74, 0x61, // ta" |
|
540 0x01, 0x01, 0x67, // "g |
|
541 0x00, 0x01, 0x61, // a |
|
542 0x00, 0x00, // |
|
543 0x00, 0x00, // |
|
544 0x00, 0x01, 0x6d, // m |
|
545 0x00, 0x01, 0x6d, // m |
|
546 0x80, 0x01, 0x61, // a" |
|
547 0x8a, 0x00, // <PONG> |
|
548 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74, // "delt |
|
549 0x00, 0x01, 0x61, // a |
|
550 0x80, 0x00, // " |
|
551 0x88, 0x00 // <CLOSE> |
|
552 |
|
553 }; |
|
554 CompletableFuture<List<String>> actual = new CompletableFuture<>(); |
|
555 |
|
556 |
|
557 try (DummyWebSocketServer server = serverWithCannedData(binary)) { |
|
558 server.open(); |
|
559 |
|
560 WebSocket.Listener listener = new WebSocket.Listener() { |
|
561 |
|
562 List<CharSequence> parts = new ArrayList<>(); |
|
563 CompletableFuture<?> currentCf; |
|
564 List<String> collected = new ArrayList<>(); |
|
565 |
|
566 @Override |
|
567 public CompletionStage<?> onText(WebSocket webSocket, |
|
568 CharSequence message, |
|
569 WebSocket.MessagePart part) { |
|
570 switch (part) { |
|
571 case WHOLE: |
|
572 CompletableFuture<?> cf = new CompletableFuture<>(); |
|
573 cf.thenRun(() -> webSocket.request(1)); |
|
574 processWholeMessage(List.of(message), cf); |
|
575 return cf; |
|
576 case FIRST: |
|
577 parts.add(message); |
|
578 currentCf = new CompletableFuture<>(); |
|
579 currentCf.thenRun(() -> webSocket.request(1)); |
|
580 webSocket.request(1); |
|
581 break; |
|
582 case PART: |
|
583 parts.add(message); |
|
584 webSocket.request(1); |
|
585 break; |
|
586 case LAST: |
|
587 parts.add(message); |
|
588 List<CharSequence> copy = List.copyOf(parts); |
|
589 parts.clear(); |
|
590 CompletableFuture<?> cf1 = currentCf; |
|
591 currentCf = null; |
|
592 processWholeMessage(copy, cf1); |
|
593 return cf1; |
|
594 } |
|
595 return currentCf; |
|
596 } |
|
597 |
|
598 @Override |
|
599 public CompletionStage<?> onClose(WebSocket webSocket, |
|
600 int statusCode, |
|
601 String reason) { |
|
602 actual.complete(collected); |
|
603 return null; |
|
604 } |
|
605 |
|
606 public void processWholeMessage(List<CharSequence> data, |
|
607 CompletableFuture<?> cf) { |
|
608 StringBuilder b = new StringBuilder(); |
|
609 data.forEach(b::append); |
|
610 String s = b.toString(); |
|
611 System.out.println(s); |
|
612 cf.complete(null); |
|
613 collected.add(s); |
|
614 } |
|
615 }; |
|
616 WebSocket ws = newHttpClient() |
|
617 .newWebSocketBuilder() |
|
618 .buildAsync(server.getURI(), listener) |
|
619 .join(); |
|
620 |
|
621 List<String> a = actual.join(); |
|
622 assertEquals(a, expected); |
|
623 } |
|
624 } |
520 } |
625 } |