8180155: WebSocket secure connection get stuck after onOpen
authorprappo
Fri, 02 Jun 2017 18:32:39 +0100
changeset 45440 d952dcd38dba
parent 45439 5673d77a787b
child 45441 11568b4f0efd
child 45473 03c5450b6e4a
child 45518 4a116dd82fb5
child 45521 df88b9c9b4ae
8180155: WebSocket secure connection get stuck after onOpen 8156518: WebSocket.Builder.connectTimeout(long timeout, TimeUnit unit) implicitly affect websocket connection timeout Reviewed-by: dfuchs
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Thu Jun 01 14:52:53 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Fri Jun 02 18:32:39 2017 +0100
@@ -316,13 +316,14 @@
                 })
             // 5. Handle errors and cancel any timer set
             .handle((response, ex) -> {
-                if (response != null) {
+                cancelTimer();
+                if (ex == null) {
+                    assert response != null;
                     return MinimalFuture.completedFuture(response);
                 }
                 // all exceptions thrown are handled here
                 CompletableFuture<Response> error = getExceptionalCF(ex);
                 if (error == null) {
-                    cancelTimer();
                     return responseAsyncImpl();
                 } else {
                     return error;
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java	Thu Jun 01 14:52:53 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java	Fri Jun 02 18:32:39 2017 +0100
@@ -274,9 +274,7 @@
                 int x;
                 do {
                     if (needData) {
-                        do {
-                            x = chan.read (unwrap_src);
-                        } while (x == 0);
+                        x = chan.read (unwrap_src);
                         if (x == -1) {
                             throw new IOException ("connection closed for reading");
                         }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Thu Jun 01 14:52:53 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Fri Jun 02 18:32:39 2017 +0100
@@ -28,7 +28,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /*
@@ -58,23 +57,24 @@
     private final Frame.Reader reader = new Frame.Reader();
     private final RawChannel.RawEvent event = createHandler();
     private final AtomicLong demand = new AtomicLong();
-    private final CooperativeHandler handler =
-              new CooperativeHandler(this::pushContinuously);
-    /*
-     * Used to ensure registering the channel event at most once (i.e. to avoid
-     * multiple registrations).
-     */
-    private final AtomicBoolean readable = new AtomicBoolean();
+    private final CooperativeHandler handler;
+
     private ByteBuffer data;
+    private volatile int state;
+
+    private static final int UNREGISTERED = 0;
+    private static final int AVAILABLE    = 1;
+    private static final int WAITING      = 2;
 
     Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
         this.messageConsumer = messageConsumer;
         this.channel = channel;
+        this.frameConsumer = new FrameConsumer(this.messageConsumer);
         this.data = channel.initialByteBuffer();
-        this.frameConsumer = new FrameConsumer(this.messageConsumer);
-        // To ensure the initial non-final `data` will be read correctly
-        // (happens-before) by reader after executing readable.get()
-        readable.set(true);
+        // To ensure the initial non-final `data` will be visible
+        // (happens-before) when `handler` invokes `pushContinuously`
+        // the following assignment is done last:
+        handler = new CooperativeHandler(this::pushContinuously);
     }
 
     private RawChannel.RawEvent createHandler() {
@@ -87,7 +87,7 @@
 
             @Override
             public void handle() {
-                readable.set(true);
+                state = AVAILABLE;
                 handler.handle();
             }
         };
@@ -110,54 +110,63 @@
 
     /*
      * Stops the machinery from reading and delivering messages permanently,
-     * regardless of the current demand.
+     * regardless of the current demand and data availability.
      */
     void close() {
         handler.stop();
     }
 
     private void pushContinuously() {
-        while (readable.get() && demand.get() > 0 && !handler.isStopped()) {
-            pushOnce();
-        }
-    }
-
-    private void pushOnce() {
-        if (data == null && !readData()) {
-            return;
-        }
-        try {
-            reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer
-        } catch (FailWebSocketException e) {
-            messageConsumer.onError(e);
-            return;
-        }
-        if (!data.hasRemaining()) {
-            data = null;
+        while (!handler.isStopped()) {
+            if (data.hasRemaining()) {
+                if (demand.get() > 0) {
+                    try {
+                        int oldPos = data.position();
+                        reader.readFrame(data, frameConsumer);
+                        int newPos = data.position();
+                        assert oldPos != newPos : data; // reader always consumes bytes
+                    } catch (FailWebSocketException e) {
+                        handler.stop();
+                        messageConsumer.onError(e);
+                    }
+                    continue;
+                }
+                break;
+            }
+            switch (state) {
+                case WAITING:
+                    return;
+                case UNREGISTERED:
+                    try {
+                        state = WAITING;
+                        channel.registerEvent(event);
+                    } catch (IOException e) {
+                        handler.stop();
+                        messageConsumer.onError(e);
+                    }
+                    return;
+                case AVAILABLE:
+                    try {
+                        data = channel.read();
+                    } catch (IOException e) {
+                        handler.stop();
+                        messageConsumer.onError(e);
+                        return;
+                    }
+                    if (data == null) { // EOF
+                        handler.stop();
+                        messageConsumer.onComplete();
+                        return;
+                    } else if (!data.hasRemaining()) { // No data at the moment
+                        // Pretty much a "goto", reusing the existing code path
+                        // for registration
+                        state = UNREGISTERED;
+                    }
+                    continue;
+                default:
+                    throw new InternalError(String.valueOf(state));
+            }
         }
     }
+}
 
-    private boolean readData() {
-        try {
-            data = channel.read();
-        } catch (IOException e) {
-            messageConsumer.onError(e);
-            return false;
-        }
-        if (data == null) { // EOF
-            messageConsumer.onComplete();
-            return false;
-        } else if (!data.hasRemaining()) { // No data in the socket at the moment
-            data = null;
-            readable.set(false);
-            try {
-                channel.registerEvent(event);
-            } catch (IOException e) {
-                messageConsumer.onError(e);
-            }
-            return false;
-        }
-        assert data.hasRemaining();
-        return true;
-    }
-}