src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
branchhttp-client-branch
changeset 56299 903ff8ec239d
parent 56282 10cebcd18d47
child 56305 9027d1747dd0
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Tue Mar 13 17:37:30 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Tue Mar 13 20:17:12 2018 +0000
@@ -46,6 +46,7 @@
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -373,6 +374,10 @@
         selmgr.cancel(s);
     }
 
+    void detachChannel(SocketChannel s, AsyncEvent... events) {
+        selmgr.detach(s, events);
+    }
+
     /**
      * Allows an AsyncEvent to modify its interestOps.
      * @param event The modified event.
@@ -509,6 +514,7 @@
         private final Selector selector;
         private volatile boolean closed;
         private final List<AsyncEvent> registrations;
+        private final List<AsyncTriggerEvent> deregistrations;
         private final System.Logger debug;
         private final System.Logger debugtimeout;
         HttpClientImpl owner;
@@ -521,9 +527,41 @@
             debugtimeout = ref.debugtimeout;
             pool = ref.connectionPool();
             registrations = new ArrayList<>();
+            deregistrations = new ArrayList<>();
             selector = Selector.open();
         }
 
+        void detach(SelectableChannel channel, AsyncEvent... events) {
+            if (Thread.currentThread() == this) {
+                SelectionKey key = channel.keyFor(selector);
+                if (key != null) {
+                    boolean removed = false;
+                    SelectorAttachment sa = (SelectorAttachment) key.attachment();
+                    if (sa != null) {
+                        for (AsyncEvent e : events) {
+                            if (sa.pending.remove(e)) removed = true;
+                        }
+                        // The key could already have been cancelled, in which
+                        // case the events would already have been removed.
+                        if (removed) {
+                            // We found at least one of the events, so we
+                            // should now cancel the key.
+                            sa.resetInterestOps(0);
+                            key.cancel();
+                        }
+                    }
+                }
+                registrations.removeAll(Arrays.asList(events));
+            } else {
+                synchronized (this) {
+                    deregistrations.add(new AsyncTriggerEvent(
+                            (x) -> debug.log(Level.DEBUG,
+                                    "Unexpected exception raised while detaching channel", x),
+                            () -> detach(channel, events)));
+                }
+            }
+        }
+
         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
             if (Thread.currentThread() == this) {
                 SelectionKey key = e.channel().keyFor(selector);
@@ -585,6 +623,10 @@
                         assert errorList.isEmpty();
                         assert readyList.isEmpty();
                         assert resetList.isEmpty();
+                        for (AsyncTriggerEvent event : deregistrations) {
+                            event.handle();
+                        }
+                        deregistrations.clear();
                         for (AsyncEvent event : registrations) {
                             if (event instanceof AsyncTriggerEvent) {
                                 readyList.add(event);
@@ -829,6 +871,10 @@
             }
         }
 
+        boolean deregister(AsyncEvent e) {
+            return pending.remove(e);
+        }
+
         /**
          * Returns a Stream<AsyncEvents> containing only events that are
          * registered with the given {@code interestOps}.