src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Sun Nov 05 17:32:13 2017 +0000
@@ -25,17 +25,6 @@
package jdk.incubator.http.internal.websocket;
-import jdk.incubator.http.WebSocket;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
-
import java.io.IOException;
import java.net.ProtocolException;
import java.net.URI;
@@ -47,10 +36,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
+import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -72,8 +74,7 @@
*/
private boolean lastMethodInvoked;
private final AtomicBoolean outstandingSend = new AtomicBoolean();
- private final CooperativeHandler sendHandler =
- new CooperativeHandler(this::sendFirst);
+ private final SequentialScheduler sendScheduler;
private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
queue = new ConcurrentLinkedQueue<>();
private final Context context = new OutgoingMessage.Context();
@@ -136,6 +137,7 @@
this.listener = requireNonNull(listener);
this.transmitter = new Transmitter(channel);
this.receiver = new Receiver(messageConsumerOf(listener), channel);
+ this.sendScheduler = new SequentialScheduler(new SendFirstTask());
// Set up the Closing Handshake action
CompletableFuture.allOf(closeReceived, closeSent)
@@ -325,36 +327,39 @@
// The queue is supposed to be unbounded
throw new InternalError();
}
- sendHandler.handle();
+ sendScheduler.runOrSchedule();
return cf;
}
/*
- * This is the main sending method. It may be run in different threads,
+ * This is the main sending task. It may be run in different threads,
* but never concurrently.
*/
- private void sendFirst(Runnable whenSent) {
- Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
- if (p == null) {
- whenSent.run();
- return;
- }
- OutgoingMessage message = p.first;
- CompletableFuture<WebSocket> cf = p.second;
- try {
- message.contextualize(context);
- Consumer<Exception> h = e -> {
- if (e == null) {
- cf.complete(WebSocketImpl.this);
- } else {
- cf.completeExceptionally(e);
- }
- sendHandler.handle();
- whenSent.run();
- };
- transmitter.send(message, h);
- } catch (Exception t) {
- cf.completeExceptionally(t);
+ private class SendFirstTask implements SequentialScheduler.RestartableTask {
+ @Override
+ public void run (DeferredCompleter taskCompleter){
+ Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
+ if (p == null) {
+ taskCompleter.complete();
+ return;
+ }
+ OutgoingMessage message = p.first;
+ CompletableFuture<WebSocket> cf = p.second;
+ try {
+ message.contextualize(context);
+ Consumer<Exception> h = e -> {
+ if (e == null) {
+ cf.complete(WebSocketImpl.this);
+ } else {
+ cf.completeExceptionally(e);
+ }
+ sendScheduler.runOrSchedule();
+ taskCompleter.complete();
+ };
+ transmitter.send(message, h);
+ } catch (Exception t) {
+ cf.completeExceptionally(t);
+ }
}
}
@@ -436,7 +441,8 @@
pongSent.whenComplete(
(r, error) -> {
if (error != null) {
- WebSocketImpl.this.signalError(error);
+ WebSocketImpl.this.signalError(
+ Utils.getCompletionCause(error));
}
}
);
@@ -482,7 +488,7 @@
enqueueClose(new Close(code, ""))
.whenComplete((r, e) -> {
if (e != null) {
- ex.addSuppressed(e);
+ ex.addSuppressed(Utils.getCompletionCause(e));
}
try {
channel.close();