src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 11:24:37 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Mar 09 16:47:00 2018 +0000
@@ -37,9 +37,11 @@
import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
@@ -70,7 +72,8 @@
private final Object closeLock = new Object();
private final RawChannel.RawEvent writeEvent = new WriteEvent();
private final RawChannel.RawEvent readEvent = new ReadEvent();
- private volatile ChannelState writeState = UNREGISTERED;
+ private final AtomicReference<ChannelState> writeState
+ = new AtomicReference<>(UNREGISTERED);
private ByteBuffer data;
private volatile ChannelState readState = UNREGISTERED;
private boolean inputClosed;
@@ -274,6 +277,8 @@
}
}
}
+ writeState.set(CLOSED);
+ sendScheduler.runOrSchedule();
}
/*
@@ -300,11 +305,12 @@
}
}
- /* Common states for send and receive tasks */
+ /* Common states for send and receive tasks */
enum ChannelState {
UNREGISTERED,
AVAILABLE,
- WAITING
+ WAITING,
+ CLOSED,
}
@SuppressWarnings({"rawtypes"})
@@ -513,7 +519,7 @@
boolean finished = false;
loop:
while (true) {
- final ChannelState ws = writeState;
+ final ChannelState ws = writeState.get();
if (DEBUG) {
System.out.printf("[Transport] write state: %s%n", ws);
}
@@ -524,13 +530,8 @@
if (DEBUG) {
System.out.printf("[Transport] registering write event%n");
}
- writeState = WAITING;
- try {
- channel.registerEvent(writeEvent);
- } catch (Throwable t) {
- writeState = UNREGISTERED;
- throw t;
- }
+ channel.registerEvent(writeEvent);
+ writeState.compareAndSet(UNREGISTERED, WAITING);
if (DEBUG) {
System.out.printf("[Transport] registered write event%n");
}
@@ -544,9 +545,11 @@
finished = true;
break loop; // All done
} else {
- writeState = UNREGISTERED;
+ writeState.compareAndSet(AVAILABLE, UNREGISTERED);
continue loop; // Effectively "goto UNREGISTERED"
}
+ case CLOSED:
+ throw new IOException("Output closed");
default:
throw new InternalError(String.valueOf(ws));
}
@@ -667,9 +670,18 @@
@Override
public void handle() {
if (DEBUG) {
- System.out.printf("[Transport] ready to write%n");
+ System.out.printf("[Transport] write event%n");
}
- writeState = AVAILABLE;
+ ChannelState s;
+ do {
+ s = writeState.get();
+ if (s == CLOSED) {
+ if (DEBUG) {
+ System.out.printf("[Transport] write state %s %n", s);
+ }
+ break;
+ }
+ } while (!writeState.compareAndSet(s, AVAILABLE));
sendScheduler.runOrSchedule();
}
}
@@ -684,7 +696,7 @@
@Override
public void handle() {
if (DEBUG) {
- System.out.printf("[Transport] ready to read%n");
+ System.out.printf("[Transport] read event%n");
}
readState = AVAILABLE;
receiveScheduler.runOrSchedule();