src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
branchhttp-client-branch
changeset 56269 234813fd33bc
parent 56263 4933a477d628
child 56291 c8c4c707ff3a
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Mar 09 16:47:00 2018 +0000
@@ -37,9 +37,11 @@
 import java.nio.channels.SelectionKey;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED;
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
 
@@ -70,7 +72,8 @@
     private final Object closeLock = new Object();
     private final RawChannel.RawEvent writeEvent = new WriteEvent();
     private final RawChannel.RawEvent readEvent = new ReadEvent();
-    private volatile ChannelState writeState = UNREGISTERED;
+    private final AtomicReference<ChannelState> writeState
+            = new AtomicReference<>(UNREGISTERED);
     private ByteBuffer data;
     private volatile ChannelState readState = UNREGISTERED;
     private boolean inputClosed;
@@ -274,6 +277,8 @@
                 }
             }
         }
+        writeState.set(CLOSED);
+        sendScheduler.runOrSchedule();
     }
 
     /*
@@ -300,11 +305,12 @@
         }
     }
 
-    /*  Common states for send and receive tasks */
+    /* Common states for send and receive tasks */
     enum ChannelState {
         UNREGISTERED,
         AVAILABLE,
-        WAITING
+        WAITING,
+        CLOSED,
     }
 
     @SuppressWarnings({"rawtypes"})
@@ -513,7 +519,7 @@
             boolean finished = false;
             loop:
             while (true) {
-                final ChannelState ws = writeState;
+                final ChannelState ws = writeState.get();
                 if (DEBUG) {
                     System.out.printf("[Transport] write state: %s%n", ws);
                 }
@@ -524,13 +530,8 @@
                         if (DEBUG) {
                             System.out.printf("[Transport] registering write event%n");
                         }
-                        writeState = WAITING;
-                        try {
-                            channel.registerEvent(writeEvent);
-                        } catch (Throwable t) {
-                            writeState = UNREGISTERED;
-                            throw t;
-                        }
+                        channel.registerEvent(writeEvent);
+                        writeState.compareAndSet(UNREGISTERED, WAITING);
                         if (DEBUG) {
                             System.out.printf("[Transport] registered write event%n");
                         }
@@ -544,9 +545,11 @@
                             finished = true;
                             break loop;   // All done
                         } else {
-                            writeState = UNREGISTERED;
+                            writeState.compareAndSet(AVAILABLE, UNREGISTERED);
                             continue loop; //  Effectively "goto UNREGISTERED"
                         }
+                    case CLOSED:
+                        throw new IOException("Output closed");
                     default:
                         throw new InternalError(String.valueOf(ws));
                 }
@@ -667,9 +670,18 @@
         @Override
         public void handle() {
             if (DEBUG) {
-                System.out.printf("[Transport] ready to write%n");
+                System.out.printf("[Transport] write event%n");
             }
-            writeState = AVAILABLE;
+            ChannelState s;
+            do {
+                s = writeState.get();
+                if (s == CLOSED) {
+                    if (DEBUG) {
+                        System.out.printf("[Transport] write state %s %n", s);
+                    }
+                    break;
+                }
+            } while (!writeState.compareAndSet(s, AVAILABLE));
             sendScheduler.runOrSchedule();
         }
     }
@@ -684,7 +696,7 @@
         @Override
         public void handle() {
             if (DEBUG) {
-                System.out.printf("[Transport] ready to read%n");
+                System.out.printf("[Transport] read event%n");
             }
             readState = AVAILABLE;
             receiveScheduler.runOrSchedule();