--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue Mar 13 17:37:30 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue Mar 13 20:17:12 2018 +0000
@@ -46,6 +46,7 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -373,6 +374,10 @@
selmgr.cancel(s);
}
+ void detachChannel(SocketChannel s, AsyncEvent... events) {
+ selmgr.detach(s, events);
+ }
+
/**
* Allows an AsyncEvent to modify its interestOps.
* @param event The modified event.
@@ -509,6 +514,7 @@
private final Selector selector;
private volatile boolean closed;
private final List<AsyncEvent> registrations;
+ private final List<AsyncTriggerEvent> deregistrations;
private final System.Logger debug;
private final System.Logger debugtimeout;
HttpClientImpl owner;
@@ -521,9 +527,41 @@
debugtimeout = ref.debugtimeout;
pool = ref.connectionPool();
registrations = new ArrayList<>();
+ deregistrations = new ArrayList<>();
selector = Selector.open();
}
+ void detach(SelectableChannel channel, AsyncEvent... events) {
+ if (Thread.currentThread() == this) {
+ SelectionKey key = channel.keyFor(selector);
+ if (key != null) {
+ boolean removed = false;
+ SelectorAttachment sa = (SelectorAttachment) key.attachment();
+ if (sa != null) {
+ for (AsyncEvent e : events) {
+ if (sa.pending.remove(e)) removed = true;
+ }
+ // The key could already have been cancelled, in which
+ // case the events would already have been removed.
+ if (removed) {
+ // We found at least one of the events, so we
+ // should now cancel the key.
+ sa.resetInterestOps(0);
+ key.cancel();
+ }
+ }
+ }
+ registrations.removeAll(Arrays.asList(events));
+ } else {
+ synchronized (this) {
+ deregistrations.add(new AsyncTriggerEvent(
+ (x) -> debug.log(Level.DEBUG,
+ "Unexpected exception raised while detaching channel", x),
+ () -> detach(channel, events)));
+ }
+ }
+ }
+
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
@@ -585,6 +623,10 @@
assert errorList.isEmpty();
assert readyList.isEmpty();
assert resetList.isEmpty();
+ for (AsyncTriggerEvent event : deregistrations) {
+ event.handle();
+ }
+ deregistrations.clear();
for (AsyncEvent event : registrations) {
if (event instanceof AsyncTriggerEvent) {
readyList.add(event);
@@ -829,6 +871,10 @@
}
}
+ boolean deregister(AsyncEvent e) {
+ return pending.remove(e);
+ }
+
/**
* Returns a Stream<AsyncEvents> containing only events that are
* registered with the given {@code interestOps}.