http-client-branch: (WebSocket) test fix + output closure http-client-branch
authorprappo
Fri, 09 Mar 2018 16:47:00 +0000
branchhttp-client-branch
changeset 56269 234813fd33bc
parent 56268 481d8c9acc7f
child 56270 5c861402c69e
child 56290 e178d19ff91c
http-client-branch: (WebSocket) test fix + output closure
src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
test/jdk/ProblemList.txt
test/jdk/java/net/httpclient/websocket/WebSocketTest.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java	Fri Mar 09 16:47:00 2018 +0000
@@ -145,14 +145,12 @@
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
         detachedChannel.close();
     }
 
     @Override
     public String toString() {
-        return super.toString()+"("+ detachedChannel.toString() + ")";
+        return super.toString() + "(" + detachedChannel.toString() + ")";
     }
-
-
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java	Fri Mar 09 16:47:00 2018 +0000
@@ -42,24 +42,12 @@
  * A stateful producer of binary representations of WebSocket messages being
  * sent from the client to the server.
  *
- * An encoding methods are given original messages and byte buffers to put the
- * resulting bytes to.
- *
- * The method is called
- * repeatedly with a non-empty target buffer. Once the caller finds the buffer
- * unmodified after the call returns, the message has been completely encoded.
+ * An encoding method is given an original message and a byte buffer to put the
+ * resulting bytes to. The method is called until it returns true. Then the
+ * reset method is called. The whole sequence repeats with next message.
  */
-
-/*
- * The state of encoding.An instance of this class is passed sequentially between messages, so
- * every message in a sequence can check the context it is in and update it
- * if necessary.
- */
-
 public class MessageEncoder {
 
-    // FIXME: write frame method
-
     private final static boolean DEBUG = false;
 
     private final SecureRandom maskingKeySource = new SecureRandom();
@@ -67,8 +55,8 @@
     private final Frame.Masker payloadMasker = new Frame.Masker();
     private final CharsetEncoder charsetEncoder
             = StandardCharsets.UTF_8.newEncoder()
-            .onMalformedInput(CodingErrorAction.REPORT)
-            .onUnmappableCharacter(CodingErrorAction.REPORT);
+                                    .onMalformedInput(CodingErrorAction.REPORT)
+                                    .onUnmappableCharacter(CodingErrorAction.REPORT);
     /*
      * This buffer is used both to encode characters to UTF-8 and to calculate
      * the length of the resulting frame's payload. The length of the payload
@@ -86,9 +74,11 @@
     private boolean flushing;
     private boolean moreText = true;
     private long headerCount;
-    private boolean previousLast = true;
+    /* Has the previous frame got its fin flag set? */
+    private boolean previousFin = true;
+    /* Was the previous frame TEXT or a CONTINUATION thereof? */
     private boolean previousText;
-    private boolean closed;
+    private boolean closed; // TODO: too late, need to check it before accepting otherwise the queue might blow up
 
     /*
      * How many bytes of the current message have been already encoded.
@@ -120,7 +110,7 @@
     }
 
     public void reset() {
-        // Do not reset the message stream state fields, e.g. previousLast,
+        // Do not reset the message stream state fields, e.g. previousFin,
         // previousText. Just an individual message state:
         started = false;
         flushing = false;
@@ -144,7 +134,7 @@
             throw new IOException("Output closed");
         }
         if (!started) {
-            if (!previousText && !previousLast) {
+            if (!previousText && !previousFin) {
                 // Previous data message was a partial binary message
                 throw new IllegalStateException("Unexpected text message");
             }
@@ -170,6 +160,8 @@
                 System.out.printf("[Output] moreText%n");
             }
             if (!moreText) {
+                previousFin = last;
+                previousText = true;
                 return true;
             }
             intermediateBuffer.clear();
@@ -196,19 +188,16 @@
             if (DEBUG) {
                 System.out.printf("[Output] header #%s%n", headerCount);
             }
-            if (headerCount == 0) { // set once
-                previousLast = last;
-                previousText = true;
-            }
             intermediateBuffer.flip();
             headerBuffer.clear();
             int mask = maskingKeySource.nextInt();
-            Opcode opcode = previousLast && headerCount == 0
+            Opcode opcode = previousFin && headerCount == 0
                     ? Opcode.TEXT : Opcode.CONTINUATION;
+            boolean fin = last && !moreText;
             if (DEBUG) {
                 System.out.printf("[Output] opcode %s%n", opcode);
             }
-            headerWriter.fin(last && !moreText)
+            headerWriter.fin(fin)
                     .opcode(opcode)
                     .payloadLen(intermediateBuffer.remaining())
                     .mask(mask)
@@ -244,7 +233,7 @@
             throw new IOException("Output closed");
         }
         if (!started) {
-            if (previousText && !previousLast) {
+            if (previousText && !previousFin) {
                 // Previous data message was a partial text message
                 throw new IllegalStateException("Unexpected binary message");
             }
@@ -252,13 +241,13 @@
             int mask = maskingKeySource.nextInt();
             headerBuffer.clear();
             headerWriter.fin(last)
-                    .opcode(previousLast ? Opcode.BINARY : Opcode.CONTINUATION)
+                    .opcode(previousFin ? Opcode.BINARY : Opcode.CONTINUATION)
                     .payloadLen(expectedLen)
                     .mask(mask)
                     .write(headerBuffer);
             headerBuffer.flip();
             payloadMasker.mask(mask);
-            previousLast = last;
+            previousFin = last;
             previousText = false;
             started = true;
         }
@@ -412,5 +401,3 @@
         return maskAvailable(intermediateBuffer, dst) >= 0;
     }
 }
-
-
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java	Fri Mar 09 16:47:00 2018 +0000
@@ -92,6 +92,10 @@
      */
     void acknowledgeReception(); // TODO: hide
 
+    /*
+     * If this method is invoked, then all pending and subsequent send
+     * operations will fail with IOException.
+     */
     void closeOutput() throws IOException;
 
     void closeInput() throws IOException;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Mar 09 16:47:00 2018 +0000
@@ -37,9 +37,11 @@
 import java.nio.channels.SelectionKey;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED;
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
 
@@ -70,7 +72,8 @@
     private final Object closeLock = new Object();
     private final RawChannel.RawEvent writeEvent = new WriteEvent();
     private final RawChannel.RawEvent readEvent = new ReadEvent();
-    private volatile ChannelState writeState = UNREGISTERED;
+    private final AtomicReference<ChannelState> writeState
+            = new AtomicReference<>(UNREGISTERED);
     private ByteBuffer data;
     private volatile ChannelState readState = UNREGISTERED;
     private boolean inputClosed;
@@ -274,6 +277,8 @@
                 }
             }
         }
+        writeState.set(CLOSED);
+        sendScheduler.runOrSchedule();
     }
 
     /*
@@ -300,11 +305,12 @@
         }
     }
 
-    /*  Common states for send and receive tasks */
+    /* Common states for send and receive tasks */
     enum ChannelState {
         UNREGISTERED,
         AVAILABLE,
-        WAITING
+        WAITING,
+        CLOSED,
     }
 
     @SuppressWarnings({"rawtypes"})
@@ -513,7 +519,7 @@
             boolean finished = false;
             loop:
             while (true) {
-                final ChannelState ws = writeState;
+                final ChannelState ws = writeState.get();
                 if (DEBUG) {
                     System.out.printf("[Transport] write state: %s%n", ws);
                 }
@@ -524,13 +530,8 @@
                         if (DEBUG) {
                             System.out.printf("[Transport] registering write event%n");
                         }
-                        writeState = WAITING;
-                        try {
-                            channel.registerEvent(writeEvent);
-                        } catch (Throwable t) {
-                            writeState = UNREGISTERED;
-                            throw t;
-                        }
+                        channel.registerEvent(writeEvent);
+                        writeState.compareAndSet(UNREGISTERED, WAITING);
                         if (DEBUG) {
                             System.out.printf("[Transport] registered write event%n");
                         }
@@ -544,9 +545,11 @@
                             finished = true;
                             break loop;   // All done
                         } else {
-                            writeState = UNREGISTERED;
+                            writeState.compareAndSet(AVAILABLE, UNREGISTERED);
                             continue loop; //  Effectively "goto UNREGISTERED"
                         }
+                    case CLOSED:
+                        throw new IOException("Output closed");
                     default:
                         throw new InternalError(String.valueOf(ws));
                 }
@@ -667,9 +670,18 @@
         @Override
         public void handle() {
             if (DEBUG) {
-                System.out.printf("[Transport] ready to write%n");
+                System.out.printf("[Transport] write event%n");
             }
-            writeState = AVAILABLE;
+            ChannelState s;
+            do {
+                s = writeState.get();
+                if (s == CLOSED) {
+                    if (DEBUG) {
+                        System.out.printf("[Transport] write state %s %n", s);
+                    }
+                    break;
+                }
+            } while (!writeState.compareAndSet(s, AVAILABLE));
             sendScheduler.runOrSchedule();
         }
     }
@@ -684,7 +696,7 @@
         @Override
         public void handle() {
             if (DEBUG) {
-                System.out.printf("[Transport] ready to read%n");
+                System.out.printf("[Transport] read event%n");
             }
             readState = AVAILABLE;
             receiveScheduler.runOrSchedule();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Fri Mar 09 16:47:00 2018 +0000
@@ -211,6 +211,7 @@
         if (!isLegalToSendFromClient(statusCode)) {
             return failedFuture(new IllegalArgumentException("statusCode"));
         }
+        // check outputClosed
         CompletableFuture<WebSocket> cf = sendClose0(statusCode, reason);
         return replaceNull(cf);
     }
@@ -432,7 +433,7 @@
                     .flip();
             // Non-exclusive send;
             BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
-                if (e != null) {
+                if (e != null) { // Better error handing. What if already closed?
                     signalError(Utils.getCompletionCause(e));
                 }
             };
--- a/test/jdk/ProblemList.txt	Fri Mar 09 11:24:37 2018 +0000
+++ b/test/jdk/ProblemList.txt	Fri Mar 09 16:47:00 2018 +0000
@@ -541,10 +541,6 @@
 
 java/net/DatagramSocket/SendDatagramToBadAddress.java           7143960 macosx-all
 
-java/net/httpclient/websocket/WebSocketTest.java                8765432  generic-all
-java/net/httpclient/websocket/WebSocketTextTest.java            8765432  generic-all
-
-
 ############################################################################
 
 # jdk_nio
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Fri Mar 09 16:47:00 2018 +0000
@@ -59,7 +59,7 @@
     private static final Class<IllegalStateException> ISE = IllegalStateException.class;
     private static final Class<IOException> IOE = IOException.class;
 
-//    @Test
+    @Test
     public void immediateAbort() throws Exception {
         try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
             server.open();
@@ -235,7 +235,6 @@
                     .join();
             ws.sendClose(NORMAL_CLOSURE, "").join();
             assertTrue(ws.isOutputClosed());
-            assertFalse(ws.isInputClosed());
             assertEquals(ws.getSubprotocol(), "");
             ws.request(1); // No exceptions must be thrown
         }
@@ -263,7 +262,6 @@
             // The output closes even if the Close message has not been sent
             assertFalse(cf.isDone());
             assertTrue(ws.isOutputClosed());
-            assertFalse(ws.isInputClosed());
             assertEquals(ws.getSubprotocol(), "");
         }
     }
@@ -286,7 +284,7 @@
         };
     }
 
-//    @Test
+    @Test
     public void abortPendingSendBinary() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
@@ -313,7 +311,7 @@
         }
     }
 
-//    @Test
+    @Test
     public void abortPendingSendText() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();