--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Wed Jun 08 15:50:11 2016 +0200
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Wed Jun 08 15:19:58 2016 +0100
@@ -51,15 +51,17 @@
*/
final class WSTransmitter {
- private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<Void>>>
+ private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<WebSocket>>>
backlog = new LinkedBlockingQueue<>();
private final WSMessageSender sender;
private final WSSignalHandler handler;
+ private final WebSocket webSocket;
private boolean previousMessageSent = true;
private boolean canSendBinary = true;
private boolean canSendText = true;
- WSTransmitter(Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
+ WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
+ this.webSocket = ws;
this.handler = new WSSignalHandler(executor, this::handleSignal);
Consumer<Throwable> sendCompletion = (error) -> {
synchronized (this) {
@@ -76,41 +78,41 @@
this.sender = new WSMessageSender(channel, sendCompletion);
}
- CompletableFuture<Void> sendText(CharSequence message, boolean isLast) {
+ CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
checkAndUpdateText(isLast);
return acceptMessage(new Text(isLast, message));
}
- CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) {
+ CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
checkAndUpdateText(true);
return acceptMessage(new StreamedText(message));
}
- CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) {
+ CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
checkAndUpdateBinary(isLast);
return acceptMessage(new Binary(isLast, message));
}
- CompletableFuture<Void> sendPing(ByteBuffer message) {
+ CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
checkSize(message.remaining(), 125);
return acceptMessage(new Ping(message));
}
- CompletableFuture<Void> sendPong(ByteBuffer message) {
+ CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
checkSize(message.remaining(), 125);
return acceptMessage(new Pong(message));
}
- CompletableFuture<Void> sendClose(WebSocket.CloseCode code, CharSequence reason) {
+ CompletableFuture<WebSocket> sendClose(WebSocket.CloseCode code, CharSequence reason) {
return acceptMessage(createCloseMessage(code, reason));
}
- CompletableFuture<Void> sendClose() {
+ CompletableFuture<WebSocket> sendClose() {
return acceptMessage(new Close(ByteBuffer.allocate(0)));
}
- private CompletableFuture<Void> acceptMessage(WSOutgoingMessage m) {
- CompletableFuture<Void> cf = new CompletableFuture<>();
+ private CompletableFuture<WebSocket> acceptMessage(WSOutgoingMessage m) {
+ CompletableFuture<WebSocket> cf = new CompletableFuture<>();
synchronized (this) {
backlog.offer(pair(m, cf));
}
@@ -123,11 +125,11 @@
synchronized (this) {
while (!backlog.isEmpty() && previousMessageSent) {
previousMessageSent = false;
- Pair<WSOutgoingMessage, CompletableFuture<Void>> p = backlog.peek();
+ Pair<WSOutgoingMessage, CompletableFuture<WebSocket>> p = backlog.peek();
boolean sent = sender.trySendFully(p.first);
if (sent) {
backlog.remove();
- p.second.complete(null);
+ p.second.complete(webSocket);
previousMessageSent = true;
}
}