--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 17:33:21 2018 +0000
@@ -520,9 +520,17 @@
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
- if (key != null) {
+ if (key != null && key.isValid()) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
- if (sa != null) sa.register(e);
+ sa.register(e);
+ } else if (e.interestOps() != 0){
+ // We don't care about paused events.
+ // These are actually handled by
+ // SelectorAttachment::resetInterestOps later on.
+ // But if we reach here when trying to resume an
+ // event then it's better to fail fast.
+ debug.log(Level.DEBUG, "No key for channel");
+ e.abort(new IOException("No key for channel"));
}
} else {
register(e);
@@ -563,11 +571,13 @@
public void run() {
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
List<AsyncEvent> readyList = new ArrayList<>();
+ List<Runnable> resetList = new ArrayList<>();
try {
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
assert errorList.isEmpty();
assert readyList.isEmpty();
+ assert resetList.isEmpty();
for (AsyncEvent event : registrations) {
if (event instanceof AsyncTriggerEvent) {
readyList.add(event);
@@ -673,9 +683,10 @@
owner.purgeTimeoutsAndReturnNextDeadline();
continue;
}
- Set<SelectionKey> keys = selector.selectedKeys();
+ Set<SelectionKey> keys = selector.selectedKeys();
assert errorList.isEmpty();
+
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
if (!key.isValid()) {
@@ -697,17 +708,24 @@
continue;
}
sa.events(eventsOccurred).forEach(readyList::add);
- sa.resetInterestOps(eventsOccurred);
+ resetList.add(() -> sa.resetInterestOps(eventsOccurred));
}
+
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
- for (AsyncEvent event : readyList) {
- handleEvent(event, null); // will be delegated to executor
- }
+ // handle selected events
+ readyList.forEach((e) -> handleEvent(e, null));
readyList.clear();
+
+ // handle errors (closed channels etc...)
errorList.forEach((p) -> handleEvent(p.first, p.second));
errorList.clear();
+
+ // reset interest ops for selected channels
+ resetList.forEach(r -> r.run());
+ resetList.clear();
+
}
} catch (Throwable e) {
//e.printStackTrace();
@@ -746,6 +764,20 @@
}
}
+ final String debugInterestOps(SelectableChannel channel) {
+ try {
+ SelectionKey key = channel.keyFor(selmgr.selector);
+ if (key == null) return "channel not registered with selector";
+ String keyInterestOps = key.isValid()
+ ? "key.interestOps=" + key.interestOps() : "invalid key";
+ return String.format("channel registered with selector, %s, sa.interestOps=%s",
+ keyInterestOps,
+ ((SelectorAttachment)key.attachment()).interestOps);
+ } catch (Throwable t) {
+ return String.valueOf(t);
+ }
+ }
+
/**
* Tracks multiple user level registrations associated with one NIO
* registration (SelectionKey). In this implementation, registrations
@@ -772,6 +804,9 @@
void register(AsyncEvent e) throws ClosedChannelException {
int newOps = e.interestOps();
+ // re register interest if we are not already interested
+ // in the event. If the event is paused, then the pause will
+ // be taken into account later when resetInterestOps is called.
boolean reRegister = (interestOps & newOps) != newOps;
interestOps |= newOps;
pending.add(e);
@@ -779,9 +814,11 @@
// first time registration happens here also
try {
chan.register(selector, interestOps, this);
- } catch (CancelledKeyException x) {
+ } catch (Throwable x) {
abortPending(x);
}
+ } else if (!chan.isOpen()) {
+ abortPending(new ClosedChannelException());
}
}
@@ -818,11 +855,20 @@
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
- if (newOps == 0 && pending.isEmpty()) {
+ if (newOps == 0 && key != null && pending.isEmpty()) {
key.cancel();
} else {
try {
+ if (key == null || !key.isValid()) {
+ throw new CancelledKeyException();
+ }
key.interestOps(newOps);
+ // double check after
+ if (!chan.isOpen()) {
+ abortPending(new ClosedChannelException());
+ return;
+ }
+ assert key.interestOps() == newOps;
} catch (CancelledKeyException x) {
// channel may have been closed
debug.log(Level.DEBUG, "key cancelled for " + chan);