--- a/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Fri Mar 09 16:47:00 2018 +0000
@@ -145,14 +145,12 @@
}
@Override
- public void close() throws IOException {
+ public void close() {
detachedChannel.close();
}
@Override
public String toString() {
- return super.toString()+"("+ detachedChannel.toString() + ")";
+ return super.toString() + "(" + detachedChannel.toString() + ")";
}
-
-
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java Fri Mar 09 16:47:00 2018 +0000
@@ -42,24 +42,12 @@
* A stateful producer of binary representations of WebSocket messages being
* sent from the client to the server.
*
- * An encoding methods are given original messages and byte buffers to put the
- * resulting bytes to.
- *
- * The method is called
- * repeatedly with a non-empty target buffer. Once the caller finds the buffer
- * unmodified after the call returns, the message has been completely encoded.
+ * An encoding method is given an original message and a byte buffer to put the
+ * resulting bytes to. The method is called until it returns true. Then the
+ * reset method is called. The whole sequence repeats with next message.
*/
-
-/*
- * The state of encoding.An instance of this class is passed sequentially between messages, so
- * every message in a sequence can check the context it is in and update it
- * if necessary.
- */
-
public class MessageEncoder {
- // FIXME: write frame method
-
private final static boolean DEBUG = false;
private final SecureRandom maskingKeySource = new SecureRandom();
@@ -67,8 +55,8 @@
private final Frame.Masker payloadMasker = new Frame.Masker();
private final CharsetEncoder charsetEncoder
= StandardCharsets.UTF_8.newEncoder()
- .onMalformedInput(CodingErrorAction.REPORT)
- .onUnmappableCharacter(CodingErrorAction.REPORT);
+ .onMalformedInput(CodingErrorAction.REPORT)
+ .onUnmappableCharacter(CodingErrorAction.REPORT);
/*
* This buffer is used both to encode characters to UTF-8 and to calculate
* the length of the resulting frame's payload. The length of the payload
@@ -86,9 +74,11 @@
private boolean flushing;
private boolean moreText = true;
private long headerCount;
- private boolean previousLast = true;
+ /* Has the previous frame got its fin flag set? */
+ private boolean previousFin = true;
+ /* Was the previous frame TEXT or a CONTINUATION thereof? */
private boolean previousText;
- private boolean closed;
+ private boolean closed; // TODO: too late, need to check it before accepting otherwise the queue might blow up
/*
* How many bytes of the current message have been already encoded.
@@ -120,7 +110,7 @@
}
public void reset() {
- // Do not reset the message stream state fields, e.g. previousLast,
+ // Do not reset the message stream state fields, e.g. previousFin,
// previousText. Just an individual message state:
started = false;
flushing = false;
@@ -144,7 +134,7 @@
throw new IOException("Output closed");
}
if (!started) {
- if (!previousText && !previousLast) {
+ if (!previousText && !previousFin) {
// Previous data message was a partial binary message
throw new IllegalStateException("Unexpected text message");
}
@@ -170,6 +160,8 @@
System.out.printf("[Output] moreText%n");
}
if (!moreText) {
+ previousFin = last;
+ previousText = true;
return true;
}
intermediateBuffer.clear();
@@ -196,19 +188,16 @@
if (DEBUG) {
System.out.printf("[Output] header #%s%n", headerCount);
}
- if (headerCount == 0) { // set once
- previousLast = last;
- previousText = true;
- }
intermediateBuffer.flip();
headerBuffer.clear();
int mask = maskingKeySource.nextInt();
- Opcode opcode = previousLast && headerCount == 0
+ Opcode opcode = previousFin && headerCount == 0
? Opcode.TEXT : Opcode.CONTINUATION;
+ boolean fin = last && !moreText;
if (DEBUG) {
System.out.printf("[Output] opcode %s%n", opcode);
}
- headerWriter.fin(last && !moreText)
+ headerWriter.fin(fin)
.opcode(opcode)
.payloadLen(intermediateBuffer.remaining())
.mask(mask)
@@ -244,7 +233,7 @@
throw new IOException("Output closed");
}
if (!started) {
- if (previousText && !previousLast) {
+ if (previousText && !previousFin) {
// Previous data message was a partial text message
throw new IllegalStateException("Unexpected binary message");
}
@@ -252,13 +241,13 @@
int mask = maskingKeySource.nextInt();
headerBuffer.clear();
headerWriter.fin(last)
- .opcode(previousLast ? Opcode.BINARY : Opcode.CONTINUATION)
+ .opcode(previousFin ? Opcode.BINARY : Opcode.CONTINUATION)
.payloadLen(expectedLen)
.mask(mask)
.write(headerBuffer);
headerBuffer.flip();
payloadMasker.mask(mask);
- previousLast = last;
+ previousFin = last;
previousText = false;
started = true;
}
@@ -412,5 +401,3 @@
return maskAvailable(intermediateBuffer, dst) >= 0;
}
}
-
-
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java Fri Mar 09 16:47:00 2018 +0000
@@ -92,6 +92,10 @@
*/
void acknowledgeReception(); // TODO: hide
+ /*
+ * If this method is invoked, then all pending and subsequent send
+ * operations will fail with IOException.
+ */
void closeOutput() throws IOException;
void closeInput() throws IOException;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 16:47:00 2018 +0000
@@ -37,9 +37,11 @@
import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
@@ -70,7 +72,8 @@
private final Object closeLock = new Object();
private final RawChannel.RawEvent writeEvent = new WriteEvent();
private final RawChannel.RawEvent readEvent = new ReadEvent();
- private volatile ChannelState writeState = UNREGISTERED;
+ private final AtomicReference<ChannelState> writeState
+ = new AtomicReference<>(UNREGISTERED);
private ByteBuffer data;
private volatile ChannelState readState = UNREGISTERED;
private boolean inputClosed;
@@ -274,6 +277,8 @@
}
}
}
+ writeState.set(CLOSED);
+ sendScheduler.runOrSchedule();
}
/*
@@ -300,11 +305,12 @@
}
}
- /* Common states for send and receive tasks */
+ /* Common states for send and receive tasks */
enum ChannelState {
UNREGISTERED,
AVAILABLE,
- WAITING
+ WAITING,
+ CLOSED,
}
@SuppressWarnings({"rawtypes"})
@@ -513,7 +519,7 @@
boolean finished = false;
loop:
while (true) {
- final ChannelState ws = writeState;
+ final ChannelState ws = writeState.get();
if (DEBUG) {
System.out.printf("[Transport] write state: %s%n", ws);
}
@@ -524,13 +530,8 @@
if (DEBUG) {
System.out.printf("[Transport] registering write event%n");
}
- writeState = WAITING;
- try {
- channel.registerEvent(writeEvent);
- } catch (Throwable t) {
- writeState = UNREGISTERED;
- throw t;
- }
+ channel.registerEvent(writeEvent);
+ writeState.compareAndSet(UNREGISTERED, WAITING);
if (DEBUG) {
System.out.printf("[Transport] registered write event%n");
}
@@ -544,9 +545,11 @@
finished = true;
break loop; // All done
} else {
- writeState = UNREGISTERED;
+ writeState.compareAndSet(AVAILABLE, UNREGISTERED);
continue loop; // Effectively "goto UNREGISTERED"
}
+ case CLOSED:
+ throw new IOException("Output closed");
default:
throw new InternalError(String.valueOf(ws));
}
@@ -667,9 +670,18 @@
@Override
public void handle() {
if (DEBUG) {
- System.out.printf("[Transport] ready to write%n");
+ System.out.printf("[Transport] write event%n");
}
- writeState = AVAILABLE;
+ ChannelState s;
+ do {
+ s = writeState.get();
+ if (s == CLOSED) {
+ if (DEBUG) {
+ System.out.printf("[Transport] write state %s %n", s);
+ }
+ break;
+ }
+ } while (!writeState.compareAndSet(s, AVAILABLE));
sendScheduler.runOrSchedule();
}
}
@@ -684,7 +696,7 @@
@Override
public void handle() {
if (DEBUG) {
- System.out.printf("[Transport] ready to read%n");
+ System.out.printf("[Transport] read event%n");
}
readState = AVAILABLE;
receiveScheduler.runOrSchedule();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Mar 09 16:47:00 2018 +0000
@@ -211,6 +211,7 @@
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(new IllegalArgumentException("statusCode"));
}
+ // check outputClosed
CompletableFuture<WebSocket> cf = sendClose0(statusCode, reason);
return replaceNull(cf);
}
@@ -432,7 +433,7 @@
.flip();
// Non-exclusive send;
BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
- if (e != null) {
+ if (e != null) { // Better error handing. What if already closed?
signalError(Utils.getCompletionCause(e));
}
};
--- a/test/jdk/ProblemList.txt Fri Mar 09 11:24:37 2018 +0000
+++ b/test/jdk/ProblemList.txt Fri Mar 09 16:47:00 2018 +0000
@@ -541,10 +541,6 @@
java/net/DatagramSocket/SendDatagramToBadAddress.java 7143960 macosx-all
-java/net/httpclient/websocket/WebSocketTest.java 8765432 generic-all
-java/net/httpclient/websocket/WebSocketTextTest.java 8765432 generic-all
-
-
############################################################################
# jdk_nio
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Fri Mar 09 11:24:37 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Fri Mar 09 16:47:00 2018 +0000
@@ -59,7 +59,7 @@
private static final Class<IllegalStateException> ISE = IllegalStateException.class;
private static final Class<IOException> IOE = IOException.class;
-// @Test
+ @Test
public void immediateAbort() throws Exception {
try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
server.open();
@@ -235,7 +235,6 @@
.join();
ws.sendClose(NORMAL_CLOSURE, "").join();
assertTrue(ws.isOutputClosed());
- assertFalse(ws.isInputClosed());
assertEquals(ws.getSubprotocol(), "");
ws.request(1); // No exceptions must be thrown
}
@@ -263,7 +262,6 @@
// The output closes even if the Close message has not been sent
assertFalse(cf.isDone());
assertTrue(ws.isOutputClosed());
- assertFalse(ws.isInputClosed());
assertEquals(ws.getSubprotocol(), "");
}
}
@@ -286,7 +284,7 @@
};
}
-// @Test
+ @Test
public void abortPendingSendBinary() throws Exception {
try (DummyWebSocketServer server = notReadingServer()) {
server.open();
@@ -313,7 +311,7 @@
}
}
-// @Test
+ @Test
public void abortPendingSendText() throws Exception {
try (DummyWebSocketServer server = notReadingServer()) {
server.open();