jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
changeset 38864 bf2b41533aed
parent 37874 02589df0999a
child 39133 b5641ce64cf7
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java	Wed Jun 08 15:50:11 2016 +0200
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java	Wed Jun 08 15:19:58 2016 +0100
@@ -51,15 +51,17 @@
  */
 final class WSTransmitter {
 
-    private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<Void>>>
+    private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<WebSocket>>>
             backlog = new LinkedBlockingQueue<>();
     private final WSMessageSender sender;
     private final WSSignalHandler handler;
+    private final WebSocket webSocket;
     private boolean previousMessageSent = true;
     private boolean canSendBinary = true;
     private boolean canSendText = true;
 
-    WSTransmitter(Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
+    WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
+        this.webSocket = ws;
         this.handler = new WSSignalHandler(executor, this::handleSignal);
         Consumer<Throwable> sendCompletion = (error) -> {
             synchronized (this) {
@@ -76,41 +78,41 @@
         this.sender = new WSMessageSender(channel, sendCompletion);
     }
 
-    CompletableFuture<Void> sendText(CharSequence message, boolean isLast) {
+    CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
         checkAndUpdateText(isLast);
         return acceptMessage(new Text(isLast, message));
     }
 
-    CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) {
+    CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
         checkAndUpdateText(true);
         return acceptMessage(new StreamedText(message));
     }
 
-    CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) {
+    CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
         checkAndUpdateBinary(isLast);
         return acceptMessage(new Binary(isLast, message));
     }
 
-    CompletableFuture<Void> sendPing(ByteBuffer message) {
+    CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
         checkSize(message.remaining(), 125);
         return acceptMessage(new Ping(message));
     }
 
-    CompletableFuture<Void> sendPong(ByteBuffer message) {
+    CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
         checkSize(message.remaining(), 125);
         return acceptMessage(new Pong(message));
     }
 
-    CompletableFuture<Void> sendClose(WebSocket.CloseCode code, CharSequence reason) {
+    CompletableFuture<WebSocket> sendClose(WebSocket.CloseCode code, CharSequence reason) {
         return acceptMessage(createCloseMessage(code, reason));
     }
 
-    CompletableFuture<Void> sendClose() {
+    CompletableFuture<WebSocket> sendClose() {
         return acceptMessage(new Close(ByteBuffer.allocate(0)));
     }
 
-    private CompletableFuture<Void> acceptMessage(WSOutgoingMessage m) {
-        CompletableFuture<Void> cf = new CompletableFuture<>();
+    private CompletableFuture<WebSocket> acceptMessage(WSOutgoingMessage m) {
+        CompletableFuture<WebSocket> cf = new CompletableFuture<>();
         synchronized (this) {
             backlog.offer(pair(m, cf));
         }
@@ -123,11 +125,11 @@
         synchronized (this) {
             while (!backlog.isEmpty() && previousMessageSent) {
                 previousMessageSent = false;
-                Pair<WSOutgoingMessage, CompletableFuture<Void>> p = backlog.peek();
+                Pair<WSOutgoingMessage, CompletableFuture<WebSocket>> p = backlog.peek();
                 boolean sent = sender.trySendFully(p.first);
                 if (sent) {
                     backlog.remove();
-                    p.second.complete(null);
+                    p.second.complete(webSocket);
                     previousMessageSent = true;
                 }
             }