# HG changeset patch # User prappo # Date 1496424759 -3600 # Node ID d952dcd38dba7f483f6199d381a575ee5951d8eb # Parent 5673d77a787b2a325a2ac934cdb4ffaee7ebb97a 8180155: WebSocket secure connection get stuck after onOpen 8156518: WebSocket.Builder.connectTimeout(long timeout, TimeUnit unit) implicitly affect websocket connection timeout Reviewed-by: dfuchs diff -r 5673d77a787b -r d952dcd38dba jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Thu Jun 01 14:52:53 2017 -0700 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Fri Jun 02 18:32:39 2017 +0100 @@ -316,13 +316,14 @@ }) // 5. Handle errors and cancel any timer set .handle((response, ex) -> { - if (response != null) { + cancelTimer(); + if (ex == null) { + assert response != null; return MinimalFuture.completedFuture(response); } // all exceptions thrown are handled here CompletableFuture error = getExceptionalCF(ex); if (error == null) { - cancelTimer(); return responseAsyncImpl(); } else { return error; diff -r 5673d77a787b -r d952dcd38dba jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java Thu Jun 01 14:52:53 2017 -0700 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLDelegate.java Fri Jun 02 18:32:39 2017 +0100 @@ -274,9 +274,7 @@ int x; do { if (needData) { - do { - x = chan.read (unwrap_src); - } while (x == 0); + x = chan.read (unwrap_src); if (x == -1) { throw new IOException ("connection closed for reading"); } diff -r 5673d77a787b -r d952dcd38dba jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Thu Jun 01 14:52:53 2017 -0700 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Fri Jun 02 18:32:39 2017 +0100 @@ -28,7 +28,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /* @@ -58,23 +57,24 @@ private final Frame.Reader reader = new Frame.Reader(); private final RawChannel.RawEvent event = createHandler(); private final AtomicLong demand = new AtomicLong(); - private final CooperativeHandler handler = - new CooperativeHandler(this::pushContinuously); - /* - * Used to ensure registering the channel event at most once (i.e. to avoid - * multiple registrations). - */ - private final AtomicBoolean readable = new AtomicBoolean(); + private final CooperativeHandler handler; + private ByteBuffer data; + private volatile int state; + + private static final int UNREGISTERED = 0; + private static final int AVAILABLE = 1; + private static final int WAITING = 2; Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { this.messageConsumer = messageConsumer; this.channel = channel; + this.frameConsumer = new FrameConsumer(this.messageConsumer); this.data = channel.initialByteBuffer(); - this.frameConsumer = new FrameConsumer(this.messageConsumer); - // To ensure the initial non-final `data` will be read correctly - // (happens-before) by reader after executing readable.get() - readable.set(true); + // To ensure the initial non-final `data` will be visible + // (happens-before) when `handler` invokes `pushContinuously` + // the following assignment is done last: + handler = new CooperativeHandler(this::pushContinuously); } private RawChannel.RawEvent createHandler() { @@ -87,7 +87,7 @@ @Override public void handle() { - readable.set(true); + state = AVAILABLE; handler.handle(); } }; @@ -110,54 +110,63 @@ /* * Stops the machinery from reading and delivering messages permanently, - * regardless of the current demand. + * regardless of the current demand and data availability. */ void close() { handler.stop(); } private void pushContinuously() { - while (readable.get() && demand.get() > 0 && !handler.isStopped()) { - pushOnce(); - } - } - - private void pushOnce() { - if (data == null && !readData()) { - return; - } - try { - reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer - } catch (FailWebSocketException e) { - messageConsumer.onError(e); - return; - } - if (!data.hasRemaining()) { - data = null; + while (!handler.isStopped()) { + if (data.hasRemaining()) { + if (demand.get() > 0) { + try { + int oldPos = data.position(); + reader.readFrame(data, frameConsumer); + int newPos = data.position(); + assert oldPos != newPos : data; // reader always consumes bytes + } catch (FailWebSocketException e) { + handler.stop(); + messageConsumer.onError(e); + } + continue; + } + break; + } + switch (state) { + case WAITING: + return; + case UNREGISTERED: + try { + state = WAITING; + channel.registerEvent(event); + } catch (IOException e) { + handler.stop(); + messageConsumer.onError(e); + } + return; + case AVAILABLE: + try { + data = channel.read(); + } catch (IOException e) { + handler.stop(); + messageConsumer.onError(e); + return; + } + if (data == null) { // EOF + handler.stop(); + messageConsumer.onComplete(); + return; + } else if (!data.hasRemaining()) { // No data at the moment + // Pretty much a "goto", reusing the existing code path + // for registration + state = UNREGISTERED; + } + continue; + default: + throw new InternalError(String.valueOf(state)); + } } } +} - private boolean readData() { - try { - data = channel.read(); - } catch (IOException e) { - messageConsumer.onError(e); - return false; - } - if (data == null) { // EOF - messageConsumer.onComplete(); - return false; - } else if (!data.hasRemaining()) { // No data in the socket at the moment - data = null; - readable.set(false); - try { - channel.registerEvent(event); - } catch (IOException e) { - messageConsumer.onError(e); - } - return false; - } - assert data.hasRemaining(); - return true; - } -}