http-client-branch: merge with default http-client-branch
authorchegar
Tue, 28 Nov 2017 10:19:38 +0000
branchhttp-client-branch
changeset 55904 6167793d2cc7
parent 55903 ffdee85b13bf (current diff)
parent 55902 071affa55c69 (diff)
child 55905 0812b07be2da
http-client-branch: merge with default
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/MessageStreamConsumer.java	Tue Nov 28 10:17:11 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/MessageStreamConsumer.java	Tue Nov 28 10:19:38 2017 +0000
@@ -44,11 +44,11 @@
 
     void onClose(int statusCode, CharSequence reason);
 
-    void onError(Exception e);
-
     /*
      * Indicates the end of stream has been reached and there will be no further
      * messages.
      */
     void onComplete();
+
+    void onError(Throwable e);
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Tue Nov 28 10:17:11 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Tue Nov 28 10:19:38 2017 +0000
@@ -76,7 +76,12 @@
         // To ensure the initial non-final `data` will be visible
         // (happens-before) when `handler` invokes `pushContinuously`
         // the following assignment is done last:
-        pushScheduler = new SequentialScheduler(new PushContinuouslyTask());
+        pushScheduler = createScheduler();
+    }
+
+    /* Exposed for testing purposes */
+    protected SequentialScheduler createScheduler() {
+        return new SequentialScheduler(new PushContinuouslyTask());
     }
 
     private RawChannel.RawEvent createHandler() {
@@ -103,6 +108,12 @@
         pushScheduler.runOrSchedule();
     }
 
+    /*
+     * Why is this method needed? Since Receiver operates through callbacks
+     * this method allows to abstract out what constitutes as a message being
+     * received (i.e. to decide outside this type when exactly one should
+     * decrement the demand).
+     */
     void acknowledge() {
         long x = demand.decreaseAndGet(1);
         if (x < 0) {
@@ -132,7 +143,7 @@
                             reader.readFrame(data, frameConsumer);
                             int newPos = data.position();
                             assert oldPos != newPos : data; // reader always consumes bytes
-                        } catch (FailWebSocketException e) {
+                        } catch (Throwable e) {
                             pushScheduler.stop();
                             messageConsumer.onError(e);
                         }
@@ -147,7 +158,7 @@
                         try {
                             state = WAITING;
                             channel.registerEvent(event);
-                        } catch (IOException e) {
+                        } catch (Throwable e) {
                             pushScheduler.stop();
                             messageConsumer.onError(e);
                         }
@@ -155,7 +166,7 @@
                     case AVAILABLE:
                         try {
                             data = channel.read();
-                        } catch (IOException e) {
+                        } catch (Throwable e) {
                             pushScheduler.stop();
                             messageConsumer.onError(e);
                             return;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Tue Nov 28 10:17:11 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Tue Nov 28 10:19:38 2017 +0000
@@ -52,6 +52,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -67,36 +69,41 @@
  */
 public final class WebSocketImpl implements WebSocket {
 
-    private final URI uri;
-    private final String subprotocol;
-    private final Listener listener;
+    private static final int IDLE   =  0;
+    private static final int OPEN   =  1;
+    private static final int TEXT   =  2;
+    private static final int BINARY =  4;
+    private static final int PING   =  8;
+    private static final int PONG   = 16;
+    private static final int CLOSE  = 32;
+    private static final int ERROR  = 64;
 
     private volatile boolean inputClosed;
     private volatile boolean outputClosed;
 
-    /*
-     * Whether or not Listener.onClose or Listener.onError has been already
-     * invoked. We keep track of this since only one of these methods is invoked
-     * and it is invoked at most once.
-     */
-    private boolean lastMethodInvoked;
+    /* Which of the listener's methods to call next? */
+    private final AtomicInteger state = new AtomicInteger(OPEN);
+
+    /* Components of calls to Listener's methods */
+    private MessagePart part;
+    private ByteBuffer binaryData;
+    private CharSequence text;
+    private int statusCode;
+    private String reason;
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+    private final URI uri;
+    private final String subprotocol;
+    private final Listener listener;
+
     private final AtomicBoolean outstandingSend = new AtomicBoolean();
-    private final SequentialScheduler sendScheduler;
+    private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
     private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
             queue = new ConcurrentLinkedQueue<>();
     private final Context context = new OutgoingMessage.Context();
     private final Transmitter transmitter;
     private final Receiver receiver;
-
-    /*
-     * This lock is enforcing sequential ordering of invocations to listener's
-     * methods. It is supposed to be uncontended. The only contention that can
-     * happen is when onOpen, an asynchronous onError (not related to reading
-     * from the channel, e.g. an error from automatic Pong reply) or onClose
-     * (related to abort) happens. Since all of the above are one-shot actions,
-     * the said contention is insignificant.
-     */
-    private final Object lock = new Object();
+    private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
 
     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
         Function<Result, WebSocket> newWebSocket = r -> {
@@ -104,9 +111,8 @@
                                                  r.subprotocol,
                                                  b.getListener(),
                                                  r.transport);
-            // The order of calls might cause a subtle effects, like CF will be
-            // returned from the buildAsync _after_ onOpen has been signalled.
-            // This means if onOpen is lengthy, it might cause some problems.
+            // This initialisation is outside of the constructor for the sake of
+            // safe publication of WebSocketImpl.this
             ws.signalOpen();
             // make sure we don't release the builder until this lambda
             // has been executed. The builder has a strong reference to
@@ -118,7 +124,7 @@
         OpeningHandshake h;
         try {
             h = new OpeningHandshake(b);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             return failedFuture(e);
         }
         return h.send().thenApply(newWebSocket);
@@ -133,117 +139,16 @@
         this.subprotocol = requireNonNull(subprotocol);
         this.listener = requireNonNull(listener);
         this.transmitter = transport.transmitter();
-        this.receiver = transport.receiver(messageConsumerOf(listener));
-        this.sendScheduler = new SequentialScheduler(new SendTask());
-    }
-
-    /*
-     * This initialisation is outside of the constructor for the sake of
-     * safe publication.
-     */
-    private void signalOpen() {
-        synchronized (lock) {
-            // TODO: might hold lock longer than needed causing prolonged
-            // contention? substitute lock for ConcurrentLinkedQueue<Runnable>?
-            try {
-                listener.onOpen(this);
-            } catch (Exception e) {
-                signalError(e);
-            }
-        }
-    }
-
-    private void signalError(Throwable error) {
-        synchronized (lock) {
-            if (lastMethodInvoked) {
-                Log.logError(error);
-            } else {
-                lastMethodInvoked = true;
-                try {
-                    try {
-                        receiver.close();
-                    } finally {
-                        transmitter.close();
-                    }
-                } catch (IOException e) {
-                    Log.logError(e);
-                }
-                try {
-                    listener.onError(this, error);
-                } catch (Exception e) {
-                    Log.logError(e);
-                }
-            }
-        }
-    }
-
-    /*
-     * Processes a Close event that came from the receiver. Invoked at most
-     * once. No further messages are pulled from the receiver.
-     */
-    private void processClose(int statusCode, String reason) {
-        inputClosed = true;
-        try {
-            receiver.close();
-        } catch (IOException e) {
-            Log.logError(e);
-        }
-        int code;
-        if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
-            code = NORMAL_CLOSURE;
-        } else {
-            code = statusCode;
-        }
-        CompletionStage<?> readyToClose = signalClose(statusCode, reason);
-        if (readyToClose == null) {
-            readyToClose = MinimalFuture.completedFuture(null);
-        }
-        readyToClose.whenComplete((r, error) -> {
-            enqueueClose(new Close(code, ""))
-                    .whenComplete((r1, error1) -> {
-                        if (error1 != null) {
-                            Log.logError(error1);
-                        }
-                    });
-        });
-    }
-
-    /*
-     * Signals a Close event (might not correspond to anything happened on the
-     * channel, e.g. `abort()`).
-     */
-    private CompletionStage<?> signalClose(int statusCode, String reason) {
-        synchronized (lock) {
-            if (lastMethodInvoked) {
-                Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
-            } else {
-                lastMethodInvoked = true;
-                try {
-                    receiver.close();
-                } catch (IOException e) {
-                    Log.logError(e);
-                }
-                try {
-                    return listener.onClose(this, statusCode, reason);
-                } catch (Exception e) {
-                    Log.logError(e);
-                }
-            }
-        }
-        return null;
+        this.receiver = transport.receiver(new SignallingMessageConsumer());
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendText(CharSequence message,
-                                                 boolean isLast)
-    {
+    public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
         return enqueueExclusively(new Text(message, isLast));
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
-                                                   boolean isLast)
-    {
+    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
         return enqueueExclusively(new Binary(message, isLast));
     }
 
@@ -258,8 +163,7 @@
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendClose(int statusCode,
-                                                  String reason) {
+    public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
         if (!isLegalToSendFromClient(statusCode)) {
             return failedFuture(
                     new IllegalArgumentException("statusCode: " + statusCode));
@@ -275,11 +179,11 @@
     }
 
     /*
-     * Sends a Close message and then shuts down the channel for writing since
-     * no more messages are expected to be sent after this.
+     * Sends a Close message, then shuts down the transmitter since no more
+     * messages are expected to be sent after this.
      */
     private CompletableFuture<WebSocket> enqueueClose(Close m) {
-        // MUST be a CF created once and shared across sendClose, otherwise
+        // TODO: MUST be a CF created once and shared across sendClose, otherwise
         // a second sendClose may prematurely close the channel
         return enqueue(m)
                 .orTimeout(60, TimeUnit.SECONDS)
@@ -309,7 +213,7 @@
     private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m)
     {
         if (!outstandingSend.compareAndSet(false, true)) {
-            return failedFuture(new IllegalStateException("Outstanding send"));
+            return failedFuture(new IllegalStateException("Send pending"));
         }
         return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
     }
@@ -331,6 +235,7 @@
      * concurrently.
      */
     private class SendTask implements SequentialScheduler.RestartableTask {
+
         @Override
         public void run(DeferredCompleter taskCompleter) {
             Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
@@ -355,7 +260,7 @@
                     repeat(taskCompleter);
                 };
                 transmitter.send(message, h);
-            } catch (Exception t) {
+            } catch (Throwable t) {
                 cf.completeExceptionally(t);
                 repeat(taskCompleter);
             }
@@ -372,6 +277,8 @@
 
     @Override
     public void request(long n) {
+        // TODO: delay until state becomes ACTIVE, otherwise messages might be
+        // requested and consecutively become pending before onOpen is signalled
         receiver.request(n);
     }
 
@@ -394,13 +301,8 @@
     public void abort() {
         inputClosed = true;
         outputClosed = true;
-        try {
-            try {
-                receiver.close();
-            } finally {
-                transmitter.close();
-            }
-        } catch (IOException ignored) { }
+        receiveScheduler.stop();
+        close();
     }
 
     @Override
@@ -411,108 +313,248 @@
                 + "]";
     }
 
-    private MessageStreamConsumer messageConsumerOf(Listener listener) {
-        // Synchronization performed here in every method is not for the sake of
-        // ordering invocations to this consumer, after all they are naturally
-        // ordered in the channel. The reason is to avoid an interference with
-        // any unrelated to the channel calls to onOpen, onClose and onError.
-        return new MessageStreamConsumer() {
+    /*
+     * The assumptions about order is as follows:
+     *
+     *     - state is never changed more than twice inside the `run` method:
+     *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
+     *       overwriting parts of messages creating a mess since there's no
+     *       queueing)
+     *     - OPEN is always the first state
+     *     - no messages are requested/delivered before onOpen is called (this
+     *       is implemented by making WebSocket instance accessible first in
+     *       onOpen)
+     *     - after the state has been observed as CLOSE/ERROR, the scheduler
+     *       is stopped
+     */
+    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
 
-            @Override
-            public void onText(CharSequence data, MessagePart part) {
-                receiver.acknowledge();
-                synchronized (WebSocketImpl.this.lock) {
-                    try {
-                        listener.onText(WebSocketImpl.this, data, part);
-                    } catch (Exception e) {
-                        signalError(e);
-                    }
+        @Override
+        public void run() {
+            final int s = state.getAndSet(IDLE);
+            try {
+                switch (s) {
+                    case OPEN:
+                        processOpen();
+                        break;
+                    case TEXT:
+                        processText();
+                        break;
+                    case BINARY:
+                        processBinary();
+                        break;
+                    case PING:
+                        processPing();
+                        break;
+                    case PONG:
+                        processPong();
+                        break;
+                    case CLOSE:
+                        processClose();
+                        break;
+                    case ERROR:
+                        processError();
+                        break;
+                    case IDLE:
+                        // For debugging spurious signalling: when there was a
+                        // signal, but apparently nothing has changed
+                        break;
+                    default:
+                        throw new InternalError(String.valueOf(s));
                 }
+                // Do not keep references to arbitrary big objects we no longer
+                // need. It is unknown when the next message might come (if
+                // ever), so the following references should be null the sooner
+                // the better:
+                binaryData = null;
+                text = null;
+            } catch (Throwable t) {
+                signalError(t);
             }
+        }
 
-            @Override
-            public void onBinary(ByteBuffer data, MessagePart part) {
-                receiver.acknowledge();
-                synchronized (WebSocketImpl.this.lock) {
-                    try {
-                        listener.onBinary(WebSocketImpl.this, data.slice(), part);
-                    } catch (Exception e) {
-                        signalError(e);
-                    }
-                }
+        private void processError() throws IOException {
+            receiver.close();
+            receiveScheduler.stop();
+            Throwable err = error.get();
+            if (err instanceof FailWebSocketException) {
+                int code1 = ((FailWebSocketException) err).getStatusCode();
+                err = new ProtocolException().initCause(err);
+                enqueueClose(new Close(code1, ""))
+                        .whenComplete(
+                                (r, e) -> {
+                                    if (e != null) {
+                                        Log.logError(e);
+                                    }
+                                });
             }
+            listener.onError(WebSocketImpl.this, err);
+        }
 
-            @Override
-            public void onPing(ByteBuffer data) {
-                receiver.acknowledge();
-                // Let's make a full copy of this tiny data. What we want here
-                // is to rule out a possibility the shared data we send might be
-                // corrupted by processing in the listener.
-                ByteBuffer slice = data.slice();
-                ByteBuffer copy = ByteBuffer.allocate(data.remaining())
-                        .put(data)
-                        .flip();
-                // Non-exclusive send;
-                CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
-                pongSent.whenComplete(
-                        (r, error) -> {
-                            if (error != null) {
-                                WebSocketImpl.this.signalError(
-                                        Utils.getCompletionCause(error));
+        private void processClose() throws IOException {
+            receiver.close();
+            receiveScheduler.stop();
+            CompletionStage<?> readyToClose;
+            readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
+            if (readyToClose == null) {
+                readyToClose = MinimalFuture.completedFuture(null);
+            }
+            int code;
+            if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
+                code = NORMAL_CLOSURE;
+            } else {
+                code = statusCode;
+            }
+            readyToClose.whenComplete((r, e) -> {
+                enqueueClose(new Close(code, ""))
+                        .whenComplete((r1, e1) -> {
+                            if (e1 != null) {
+                                Log.logError(e1);
                             }
+                        });
+            });
+        }
+
+        private void processPong() {
+            listener.onPong(WebSocketImpl.this, binaryData);
+        }
+
+        private void processPing() {
+            // Let's make a full copy of this tiny data. What we want here
+            // is to rule out a possibility the shared data we send might be
+            // corrupted by processing in the listener.
+            ByteBuffer slice = binaryData.slice();
+            ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
+                    .put(binaryData)
+                    .flip();
+            // Non-exclusive send;
+            CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
+            pongSent.whenComplete(
+                    (r, e) -> {
+                        if (e != null) {
+                            signalError(Utils.getCompletionCause(e));
                         }
-                );
-                synchronized (WebSocketImpl.this.lock) {
-                    try {
-                        listener.onPing(WebSocketImpl.this, slice);
-                    } catch (Exception e) {
-                        signalError(e);
                     }
-                }
-            }
+            );
+            listener.onPing(WebSocketImpl.this, slice);
+        }
+
+        private void processBinary() {
+            listener.onBinary(WebSocketImpl.this, binaryData, part);
+        }
+
+        private void processText() {
+            listener.onText(WebSocketImpl.this, text, part);
+        }
+
+        private void processOpen() {
+            listener.onOpen(WebSocketImpl.this);
+        }
+    }
+
+    private void signalOpen() {
+        receiveScheduler.runOrSchedule();
+    }
 
-            @Override
-            public void onPong(ByteBuffer data) {
-                receiver.acknowledge();
-                synchronized (WebSocketImpl.this.lock) {
-                    try {
-                        listener.onPong(WebSocketImpl.this, data.slice());
-                    } catch (Exception e) {
-                        signalError(e);
-                    }
-                }
+    private void signalError(Throwable error) {
+        inputClosed = true;
+        outputClosed = true;
+        if (!this.error.compareAndSet(null, error) || !tryChangeState(ERROR)) {
+            Log.logError(error);
+        } else {
+            close();
+        }
+    }
+
+    private void close() {
+        try {
+            try {
+                receiver.close();
+            } finally {
+                transmitter.close();
             }
+        } catch (Throwable t) {
+            Log.logError(t);
+        }
+    }
 
-            @Override
-            public void onClose(int statusCode, CharSequence reason) {
-                receiver.acknowledge();
-                processClose(statusCode, reason.toString());
+    /*
+     * Signals a Close event (might not correspond to anything happened on the
+     * channel, i.e. might be synthetic).
+     */
+    private void signalClose(int statusCode, String reason) {
+        inputClosed = true;
+        this.statusCode = statusCode;
+        this.reason = reason;
+        if (!tryChangeState(CLOSE)) {
+            Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
+        } else {
+            try {
+                receiver.close();
+            } catch (Throwable t) {
+                Log.logError(t);
             }
+        }
+    }
 
-            @Override
-            public void onError(Exception error) {
-                inputClosed = true;
-                outputClosed = true;
-                if (!(error instanceof FailWebSocketException)) {
-                    abort();
-                    signalError(error);
-                } else {
-                    Exception ex = (Exception) new ProtocolException().initCause(error);
-                    int code = ((FailWebSocketException) error).getStatusCode();
-                    enqueueClose(new Close(code, "")) // do we have to wait for 60 secs? nah...
-                            .whenComplete((r, e) -> {
-                                if (e != null) {
-                                    ex.addSuppressed(Utils.getCompletionCause(e));
-                                }
-                                signalError(ex);
-                            });
-                }
+    private class SignallingMessageConsumer implements MessageStreamConsumer {
+
+        @Override
+        public void onText(CharSequence data, MessagePart part) {
+            receiver.acknowledge();
+            text = data;
+            WebSocketImpl.this.part = part;
+            tryChangeState(TEXT);
+        }
+
+        @Override
+        public void onBinary(ByteBuffer data, MessagePart part) {
+            receiver.acknowledge();
+            binaryData = data;
+            WebSocketImpl.this.part = part;
+            tryChangeState(BINARY);
+        }
+
+        @Override
+        public void onPing(ByteBuffer data) {
+            receiver.acknowledge();
+            binaryData = data;
+            tryChangeState(PING);
+        }
+
+        @Override
+        public void onPong(ByteBuffer data) {
+            receiver.acknowledge();
+            binaryData = data;
+            tryChangeState(PONG);
+        }
+
+        @Override
+        public void onClose(int statusCode, CharSequence reason) {
+            receiver.acknowledge();
+            signalClose(statusCode, reason.toString());
+        }
+
+        @Override
+        public void onComplete() {
+            signalClose(CLOSED_ABNORMALLY, "");
+        }
+
+        @Override
+        public void onError(Throwable error) {
+            signalError(error);
+        }
+    }
+
+    private boolean tryChangeState(int newState) {
+        while (true) {
+            int currentState = state.get();
+            if (currentState == ERROR || currentState == CLOSE) {
+                return false;
+            } else if (state.compareAndSet(currentState, newState)) {
+                receiveScheduler.runOrSchedule();
+                return true;
             }
-
-            @Override
-            public void onComplete() {
-                processClose(CLOSED_ABNORMALLY, "");
-            }
-        };
+        }
     }
 }