src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Sun Nov 05 17:32:13 2017 +0000
@@ -25,17 +25,6 @@
 
 package jdk.incubator.http.internal.websocket;
 
-import jdk.incubator.http.WebSocket;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
-
 import java.io.IOException;
 import java.net.ProtocolException;
 import java.net.URI;
@@ -47,10 +36,23 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
 
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static jdk.incubator.http.internal.common.Pair.pair;
+import jdk.incubator.http.internal.common.Utils;
 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -72,8 +74,7 @@
      */
     private boolean lastMethodInvoked;
     private final AtomicBoolean outstandingSend = new AtomicBoolean();
-    private final CooperativeHandler sendHandler =
-              new CooperativeHandler(this::sendFirst);
+    private final SequentialScheduler sendScheduler;
     private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
             queue = new ConcurrentLinkedQueue<>();
     private final Context context = new OutgoingMessage.Context();
@@ -136,6 +137,7 @@
         this.listener = requireNonNull(listener);
         this.transmitter = new Transmitter(channel);
         this.receiver = new Receiver(messageConsumerOf(listener), channel);
+        this.sendScheduler = new SequentialScheduler(new SendFirstTask());
 
         // Set up the Closing Handshake action
         CompletableFuture.allOf(closeReceived, closeSent)
@@ -325,36 +327,39 @@
             // The queue is supposed to be unbounded
             throw new InternalError();
         }
-        sendHandler.handle();
+        sendScheduler.runOrSchedule();
         return cf;
     }
 
     /*
-     * This is the main sending method. It may be run in different threads,
+     * This is the main sending task. It may be run in different threads,
      * but never concurrently.
      */
-    private void sendFirst(Runnable whenSent) {
-        Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
-        if (p == null) {
-            whenSent.run();
-            return;
-        }
-        OutgoingMessage message = p.first;
-        CompletableFuture<WebSocket> cf = p.second;
-        try {
-            message.contextualize(context);
-            Consumer<Exception> h = e -> {
-                if (e == null) {
-                    cf.complete(WebSocketImpl.this);
-                } else {
-                    cf.completeExceptionally(e);
-                }
-                sendHandler.handle();
-                whenSent.run();
-            };
-            transmitter.send(message, h);
-        } catch (Exception t) {
-            cf.completeExceptionally(t);
+    private class SendFirstTask implements SequentialScheduler.RestartableTask {
+        @Override
+        public void run (DeferredCompleter taskCompleter){
+            Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
+            if (p == null) {
+                taskCompleter.complete();
+                return;
+            }
+            OutgoingMessage message = p.first;
+            CompletableFuture<WebSocket> cf = p.second;
+            try {
+                message.contextualize(context);
+                Consumer<Exception> h = e -> {
+                    if (e == null) {
+                        cf.complete(WebSocketImpl.this);
+                    } else {
+                        cf.completeExceptionally(e);
+                    }
+                    sendScheduler.runOrSchedule();
+                    taskCompleter.complete();
+                };
+                transmitter.send(message, h);
+            } catch (Exception t) {
+                cf.completeExceptionally(t);
+            }
         }
     }
 
@@ -436,7 +441,8 @@
                 pongSent.whenComplete(
                         (r, error) -> {
                             if (error != null) {
-                                WebSocketImpl.this.signalError(error);
+                                WebSocketImpl.this.signalError(
+                                        Utils.getCompletionCause(error));
                             }
                         }
                 );
@@ -482,7 +488,7 @@
                     enqueueClose(new Close(code, ""))
                             .whenComplete((r, e) -> {
                                 if (e != null) {
-                                    ex.addSuppressed(e);
+                                    ex.addSuppressed(Utils.getCompletionCause(e));
                                 }
                                 try {
                                     channel.close();