# HG changeset patch # User prappo # Date 1520614020 0 # Node ID 234813fd33bcca6a220ef7204e1cb6cf4ec119d5 # Parent 481d8c9acc7f87711c00254835622cb280f85315 http-client-branch: (WebSocket) test fix + output closure diff -r 481d8c9acc7f -r 234813fd33bc src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Fri Mar 09 11:24:37 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Fri Mar 09 16:47:00 2018 +0000 @@ -145,14 +145,12 @@ } @Override - public void close() throws IOException { + public void close() { detachedChannel.close(); } @Override public String toString() { - return super.toString()+"("+ detachedChannel.toString() + ")"; + return super.toString() + "(" + detachedChannel.toString() + ")"; } - - } diff -r 481d8c9acc7f -r 234813fd33bc src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java Fri Mar 09 11:24:37 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java Fri Mar 09 16:47:00 2018 +0000 @@ -42,24 +42,12 @@ * A stateful producer of binary representations of WebSocket messages being * sent from the client to the server. * - * An encoding methods are given original messages and byte buffers to put the - * resulting bytes to. - * - * The method is called - * repeatedly with a non-empty target buffer. Once the caller finds the buffer - * unmodified after the call returns, the message has been completely encoded. + * An encoding method is given an original message and a byte buffer to put the + * resulting bytes to. The method is called until it returns true. Then the + * reset method is called. The whole sequence repeats with next message. */ - -/* - * The state of encoding.An instance of this class is passed sequentially between messages, so - * every message in a sequence can check the context it is in and update it - * if necessary. - */ - public class MessageEncoder { - // FIXME: write frame method - private final static boolean DEBUG = false; private final SecureRandom maskingKeySource = new SecureRandom(); @@ -67,8 +55,8 @@ private final Frame.Masker payloadMasker = new Frame.Masker(); private final CharsetEncoder charsetEncoder = StandardCharsets.UTF_8.newEncoder() - .onMalformedInput(CodingErrorAction.REPORT) - .onUnmappableCharacter(CodingErrorAction.REPORT); + .onMalformedInput(CodingErrorAction.REPORT) + .onUnmappableCharacter(CodingErrorAction.REPORT); /* * This buffer is used both to encode characters to UTF-8 and to calculate * the length of the resulting frame's payload. The length of the payload @@ -86,9 +74,11 @@ private boolean flushing; private boolean moreText = true; private long headerCount; - private boolean previousLast = true; + /* Has the previous frame got its fin flag set? */ + private boolean previousFin = true; + /* Was the previous frame TEXT or a CONTINUATION thereof? */ private boolean previousText; - private boolean closed; + private boolean closed; // TODO: too late, need to check it before accepting otherwise the queue might blow up /* * How many bytes of the current message have been already encoded. @@ -120,7 +110,7 @@ } public void reset() { - // Do not reset the message stream state fields, e.g. previousLast, + // Do not reset the message stream state fields, e.g. previousFin, // previousText. Just an individual message state: started = false; flushing = false; @@ -144,7 +134,7 @@ throw new IOException("Output closed"); } if (!started) { - if (!previousText && !previousLast) { + if (!previousText && !previousFin) { // Previous data message was a partial binary message throw new IllegalStateException("Unexpected text message"); } @@ -170,6 +160,8 @@ System.out.printf("[Output] moreText%n"); } if (!moreText) { + previousFin = last; + previousText = true; return true; } intermediateBuffer.clear(); @@ -196,19 +188,16 @@ if (DEBUG) { System.out.printf("[Output] header #%s%n", headerCount); } - if (headerCount == 0) { // set once - previousLast = last; - previousText = true; - } intermediateBuffer.flip(); headerBuffer.clear(); int mask = maskingKeySource.nextInt(); - Opcode opcode = previousLast && headerCount == 0 + Opcode opcode = previousFin && headerCount == 0 ? Opcode.TEXT : Opcode.CONTINUATION; + boolean fin = last && !moreText; if (DEBUG) { System.out.printf("[Output] opcode %s%n", opcode); } - headerWriter.fin(last && !moreText) + headerWriter.fin(fin) .opcode(opcode) .payloadLen(intermediateBuffer.remaining()) .mask(mask) @@ -244,7 +233,7 @@ throw new IOException("Output closed"); } if (!started) { - if (previousText && !previousLast) { + if (previousText && !previousFin) { // Previous data message was a partial text message throw new IllegalStateException("Unexpected binary message"); } @@ -252,13 +241,13 @@ int mask = maskingKeySource.nextInt(); headerBuffer.clear(); headerWriter.fin(last) - .opcode(previousLast ? Opcode.BINARY : Opcode.CONTINUATION) + .opcode(previousFin ? Opcode.BINARY : Opcode.CONTINUATION) .payloadLen(expectedLen) .mask(mask) .write(headerBuffer); headerBuffer.flip(); payloadMasker.mask(mask); - previousLast = last; + previousFin = last; previousText = false; started = true; } @@ -412,5 +401,3 @@ return maskAvailable(intermediateBuffer, dst) >= 0; } } - - diff -r 481d8c9acc7f -r 234813fd33bc src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java Fri Mar 09 11:24:37 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java Fri Mar 09 16:47:00 2018 +0000 @@ -92,6 +92,10 @@ */ void acknowledgeReception(); // TODO: hide + /* + * If this method is invoked, then all pending and subsequent send + * operations will fail with IOException. + */ void closeOutput() throws IOException; void closeInput() throws IOException; diff -r 481d8c9acc7f -r 234813fd33bc src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 11:24:37 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 16:47:00 2018 +0000 @@ -37,9 +37,11 @@ import java.nio.channels.SelectionKey; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE; +import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED; import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED; import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING; @@ -70,7 +72,8 @@ private final Object closeLock = new Object(); private final RawChannel.RawEvent writeEvent = new WriteEvent(); private final RawChannel.RawEvent readEvent = new ReadEvent(); - private volatile ChannelState writeState = UNREGISTERED; + private final AtomicReference writeState + = new AtomicReference<>(UNREGISTERED); private ByteBuffer data; private volatile ChannelState readState = UNREGISTERED; private boolean inputClosed; @@ -274,6 +277,8 @@ } } } + writeState.set(CLOSED); + sendScheduler.runOrSchedule(); } /* @@ -300,11 +305,12 @@ } } - /* Common states for send and receive tasks */ + /* Common states for send and receive tasks */ enum ChannelState { UNREGISTERED, AVAILABLE, - WAITING + WAITING, + CLOSED, } @SuppressWarnings({"rawtypes"}) @@ -513,7 +519,7 @@ boolean finished = false; loop: while (true) { - final ChannelState ws = writeState; + final ChannelState ws = writeState.get(); if (DEBUG) { System.out.printf("[Transport] write state: %s%n", ws); } @@ -524,13 +530,8 @@ if (DEBUG) { System.out.printf("[Transport] registering write event%n"); } - writeState = WAITING; - try { - channel.registerEvent(writeEvent); - } catch (Throwable t) { - writeState = UNREGISTERED; - throw t; - } + channel.registerEvent(writeEvent); + writeState.compareAndSet(UNREGISTERED, WAITING); if (DEBUG) { System.out.printf("[Transport] registered write event%n"); } @@ -544,9 +545,11 @@ finished = true; break loop; // All done } else { - writeState = UNREGISTERED; + writeState.compareAndSet(AVAILABLE, UNREGISTERED); continue loop; // Effectively "goto UNREGISTERED" } + case CLOSED: + throw new IOException("Output closed"); default: throw new InternalError(String.valueOf(ws)); } @@ -667,9 +670,18 @@ @Override public void handle() { if (DEBUG) { - System.out.printf("[Transport] ready to write%n"); + System.out.printf("[Transport] write event%n"); } - writeState = AVAILABLE; + ChannelState s; + do { + s = writeState.get(); + if (s == CLOSED) { + if (DEBUG) { + System.out.printf("[Transport] write state %s %n", s); + } + break; + } + } while (!writeState.compareAndSet(s, AVAILABLE)); sendScheduler.runOrSchedule(); } } @@ -684,7 +696,7 @@ @Override public void handle() { if (DEBUG) { - System.out.printf("[Transport] ready to read%n"); + System.out.printf("[Transport] read event%n"); } readState = AVAILABLE; receiveScheduler.runOrSchedule(); diff -r 481d8c9acc7f -r 234813fd33bc src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Mar 09 11:24:37 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Mar 09 16:47:00 2018 +0000 @@ -211,6 +211,7 @@ if (!isLegalToSendFromClient(statusCode)) { return failedFuture(new IllegalArgumentException("statusCode")); } + // check outputClosed CompletableFuture cf = sendClose0(statusCode, reason); return replaceNull(cf); } @@ -432,7 +433,7 @@ .flip(); // Non-exclusive send; BiConsumer reporter = (r, e) -> { - if (e != null) { + if (e != null) { // Better error handing. What if already closed? signalError(Utils.getCompletionCause(e)); } }; diff -r 481d8c9acc7f -r 234813fd33bc test/jdk/ProblemList.txt --- a/test/jdk/ProblemList.txt Fri Mar 09 11:24:37 2018 +0000 +++ b/test/jdk/ProblemList.txt Fri Mar 09 16:47:00 2018 +0000 @@ -541,10 +541,6 @@ java/net/DatagramSocket/SendDatagramToBadAddress.java 7143960 macosx-all -java/net/httpclient/websocket/WebSocketTest.java 8765432 generic-all -java/net/httpclient/websocket/WebSocketTextTest.java 8765432 generic-all - - ############################################################################ # jdk_nio diff -r 481d8c9acc7f -r 234813fd33bc test/jdk/java/net/httpclient/websocket/WebSocketTest.java --- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Fri Mar 09 11:24:37 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Fri Mar 09 16:47:00 2018 +0000 @@ -59,7 +59,7 @@ private static final Class ISE = IllegalStateException.class; private static final Class IOE = IOException.class; -// @Test + @Test public void immediateAbort() throws Exception { try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) { server.open(); @@ -235,7 +235,6 @@ .join(); ws.sendClose(NORMAL_CLOSURE, "").join(); assertTrue(ws.isOutputClosed()); - assertFalse(ws.isInputClosed()); assertEquals(ws.getSubprotocol(), ""); ws.request(1); // No exceptions must be thrown } @@ -263,7 +262,6 @@ // The output closes even if the Close message has not been sent assertFalse(cf.isDone()); assertTrue(ws.isOutputClosed()); - assertFalse(ws.isInputClosed()); assertEquals(ws.getSubprotocol(), ""); } } @@ -286,7 +284,7 @@ }; } -// @Test + @Test public void abortPendingSendBinary() throws Exception { try (DummyWebSocketServer server = notReadingServer()) { server.open(); @@ -313,7 +311,7 @@ } } -// @Test + @Test public void abortPendingSendText() throws Exception { try (DummyWebSocketServer server = notReadingServer()) { server.open();