src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
branchhttp-client-branch
changeset 56165 8a6065d830b9
parent 56092 fd85b2bf2b0d
child 56235 6218673d7fa0
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Feb 22 17:33:21 2018 +0000
@@ -520,9 +520,17 @@
         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
             if (Thread.currentThread() == this) {
                 SelectionKey key = e.channel().keyFor(selector);
-                if (key != null) {
+                if (key != null && key.isValid()) {
                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
-                    if (sa != null) sa.register(e);
+                    sa.register(e);
+                } else if (e.interestOps() != 0){
+                    // We don't care about paused events.
+                    // These are actually handled by
+                    // SelectorAttachment::resetInterestOps later on.
+                    // But if we reach here when trying to resume an
+                    // event then it's better to fail fast.
+                    debug.log(Level.DEBUG, "No key for channel");
+                    e.abort(new IOException("No key for channel"));
                 }
             } else {
                 register(e);
@@ -563,11 +571,13 @@
         public void run() {
             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
             List<AsyncEvent> readyList = new ArrayList<>();
+            List<Runnable> resetList = new ArrayList<>();
             try {
                 while (!Thread.currentThread().isInterrupted()) {
                     synchronized (this) {
                         assert errorList.isEmpty();
                         assert readyList.isEmpty();
+                        assert resetList.isEmpty();
                         for (AsyncEvent event : registrations) {
                             if (event instanceof AsyncTriggerEvent) {
                                 readyList.add(event);
@@ -673,9 +683,10 @@
                         owner.purgeTimeoutsAndReturnNextDeadline();
                         continue;
                     }
-                    Set<SelectionKey> keys = selector.selectedKeys();
 
+                    Set<SelectionKey> keys = selector.selectedKeys();
                     assert errorList.isEmpty();
+
                     for (SelectionKey key : keys) {
                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
                         if (!key.isValid()) {
@@ -697,17 +708,24 @@
                             continue;
                         }
                         sa.events(eventsOccurred).forEach(readyList::add);
-                        sa.resetInterestOps(eventsOccurred);
+                        resetList.add(() -> sa.resetInterestOps(eventsOccurred));
                     }
+
                     selector.selectNow(); // complete cancellation
                     selector.selectedKeys().clear();
 
-                    for (AsyncEvent event : readyList) {
-                        handleEvent(event, null); // will be delegated to executor
-                    }
+                    // handle selected events
+                    readyList.forEach((e) -> handleEvent(e, null));
                     readyList.clear();
+
+                    // handle errors (closed channels etc...)
                     errorList.forEach((p) -> handleEvent(p.first, p.second));
                     errorList.clear();
+
+                    // reset interest ops for selected channels
+                    resetList.forEach(r -> r.run());
+                    resetList.clear();
+
                 }
             } catch (Throwable e) {
                 //e.printStackTrace();
@@ -746,6 +764,20 @@
         }
     }
 
+    final String debugInterestOps(SelectableChannel channel) {
+        try {
+            SelectionKey key = channel.keyFor(selmgr.selector);
+            if (key == null) return "channel not registered with selector";
+            String keyInterestOps = key.isValid()
+                    ? "key.interestOps=" + key.interestOps() : "invalid key";
+            return String.format("channel registered with selector, %s, sa.interestOps=%s",
+                                 keyInterestOps,
+                                 ((SelectorAttachment)key.attachment()).interestOps);
+        } catch (Throwable t) {
+            return String.valueOf(t);
+        }
+    }
+
     /**
      * Tracks multiple user level registrations associated with one NIO
      * registration (SelectionKey). In this implementation, registrations
@@ -772,6 +804,9 @@
 
         void register(AsyncEvent e) throws ClosedChannelException {
             int newOps = e.interestOps();
+            // re register interest if we are not already interested
+            // in the event. If the event is paused, then the pause will
+            // be taken into account later when resetInterestOps is called.
             boolean reRegister = (interestOps & newOps) != newOps;
             interestOps |= newOps;
             pending.add(e);
@@ -779,9 +814,11 @@
                 // first time registration happens here also
                 try {
                     chan.register(selector, interestOps, this);
-                } catch (CancelledKeyException x) {
+                } catch (Throwable x) {
                     abortPending(x);
                 }
+            } else if (!chan.isOpen()) {
+                abortPending(new ClosedChannelException());
             }
         }
 
@@ -818,11 +855,20 @@
 
             this.interestOps = newOps;
             SelectionKey key = chan.keyFor(selector);
-            if (newOps == 0 && pending.isEmpty()) {
+            if (newOps == 0 && key != null && pending.isEmpty()) {
                 key.cancel();
             } else {
                 try {
+                    if (key == null || !key.isValid()) {
+                        throw new CancelledKeyException();
+                    }
                     key.interestOps(newOps);
+                    // double check after
+                    if (!chan.isOpen()) {
+                        abortPending(new ClosedChannelException());
+                        return;
+                    }
+                    assert key.interestOps() == newOps;
                 } catch (CancelledKeyException x) {
                     // channel may have been closed
                     debug.log(Level.DEBUG, "key cancelled for " + chan);