--- a/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -27,15 +27,11 @@
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
-import java.util.BitSet;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -67,14 +63,11 @@
// maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
- // file descriptors registered with epoll, synchronize on selector
- private final BitSet registered = new BitSet();
-
- // pending new registrations/updates, queued by implRegister and putEventOps
+ // pending new registrations/updates, queued by implRegister and putEventOpos
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
- private final Deque<Integer> updateOps = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing
private final Object interruptLock = new Object();
@@ -113,15 +106,17 @@
protected int doSelect(long timeout) throws IOException {
assert Thread.holdsLock(this);
+ // epoll_wait timeout is int
+ int to = (int) Math.min(timeout, Integer.MAX_VALUE);
+ boolean blocking = (to != 0);
+ boolean timedPoll = (to > 0);
+
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
- begin();
+ begin(blocking);
- // epoll_wait timeout is int
- int to = (int) Math.min(timeout, Integer.MAX_VALUE);
- boolean timedPoll = (to > 0);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
@@ -138,7 +133,7 @@
assert IOStatus.check(numEntries);
} finally {
- end();
+ end(blocking);
}
processDeregisterQueue();
return updateSelectedKeys(numEntries);
@@ -156,33 +151,34 @@
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
+ int fd = ski.channel.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
- assert registered.get(fd) == false;
+ assert ski.registeredEvents() == 0;
}
}
// changes to interest ops
- assert updateKeys.size() == updateOps.size();
+ assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) {
- int ops = updateOps.pollFirst();
+ int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
- if (registered.get(fd)) {
- if (ops == 0) {
+ int registeredEvents = ski.registeredEvents();
+ if (newEvents != registeredEvents) {
+ if (newEvents == 0) {
// remove from epoll
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
- registered.clear(fd);
} else {
- // modify events
- EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
+ if (registeredEvents == 0) {
+ // add to epoll
+ EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
+ } else {
+ // modify events
+ EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
+ }
}
- } else if (ops != 0) {
- // add to epoll
- EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
- registered.set(fd);
+ ski.registeredEvents(newEvents);
}
}
}
@@ -190,8 +186,9 @@
}
/**
- * Update the keys whose fd's have been selected by the epoll.
- * Add the ready keys to the ready queue.
+ * Update the keys of file descriptors that were polled and add them to
+ * the selected-key set.
+ * If the interrupt fd has been selected, drain it and clear the interrupt.
*/
private int updateSelectedKeys(int numEntries) throws IOException {
assert Thread.holdsLock(this);
@@ -233,7 +230,6 @@
@Override
protected void implClose() throws IOException {
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
@@ -245,59 +241,37 @@
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
-
- // Deregister channels
- Iterator<SelectionKey> i = keys.iterator();
- while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- i.remove();
- }
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
- assert Thread.holdsLock(nioKeys());
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
- keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
- assert Thread.holdsLock(nioSelectedKeys());
int fd = ski.channel.getFDVal();
- fdToKey.remove(fd);
- if (registered.get(fd)) {
- EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
- registered.clear(fd);
+ if (fdToKey.remove(fd) != null) {
+ if (ski.registeredEvents() != 0) {
+ EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
+ ski.registeredEvents(0);
+ }
+ } else {
+ assert ski.registeredEvents() == 0;
}
-
- selectedKeys.remove(ski);
- keys.remove(ski);
-
- // remove from channel's key set
- deregister(ski);
-
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl ski, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
- updateOps.addLast(ops); // ops first in case adding the key fails
+ updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski);
}
}
--- a/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -27,15 +27,11 @@
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
-import java.util.BitSet;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -66,15 +62,11 @@
// maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
- // file descriptors registered with kqueue, synchronize on selector
- private final BitSet registeredReadFilter = new BitSet();
- private final BitSet registeredWriteFilter = new BitSet();
-
// pending new registrations/updates, queued by implRegister and putEventOps
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
- private final Deque<Integer> updateOps = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing
private final Object interruptLock = new Object();
@@ -113,14 +105,16 @@
protected int doSelect(long timeout) throws IOException {
assert Thread.holdsLock(this);
+ long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
+ boolean blocking = (to != 0);
+ boolean timedPoll = (to > 0);
+
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
- begin();
+ begin(blocking);
- long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
- boolean timedPoll = (to > 0);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
@@ -137,7 +131,7 @@
assert IOStatus.check(numEntries);
} finally {
- end();
+ end(blocking);
}
processDeregisterQueue();
return updateSelectedKeys(numEntries);
@@ -155,41 +149,41 @@
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
+ int fd = ski.channel.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
- assert registeredReadFilter.get(fd) == false;
- assert registeredWriteFilter.get(fd) == false;
+ assert ski.registeredEvents() == 0;
}
}
// changes to interest ops
- assert updateKeys.size() == updateOps.size();
+ assert updateKeys.size() == updateKeys.size();
while ((ski = updateKeys.pollFirst()) != null) {
- int ops = updateOps.pollFirst();
+ int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
- // add or delete interest in read events
- if (registeredReadFilter.get(fd)) {
- if ((ops & Net.POLLIN) == 0) {
- KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
- registeredReadFilter.clear(fd);
+ int registeredEvents = ski.registeredEvents();
+ if (newEvents != registeredEvents) {
+
+ // add or delete interest in read events
+ if ((registeredEvents & Net.POLLIN) != 0) {
+ if ((newEvents & Net.POLLIN) == 0) {
+ KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
+ }
+ } else if ((newEvents & Net.POLLIN) != 0) {
+ KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
}
- } else if ((ops & Net.POLLIN) != 0) {
- KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
- registeredReadFilter.set(fd);
- }
- // add or delete interest in write events
- if (registeredWriteFilter.get(fd)) {
- if ((ops & Net.POLLOUT) == 0) {
- KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
- registeredWriteFilter.clear(fd);
+ // add or delete interest in write events
+ if ((registeredEvents & Net.POLLOUT) != 0) {
+ if ((newEvents & Net.POLLOUT) == 0) {
+ KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
+ }
+ } else if ((newEvents & Net.POLLOUT) != 0) {
+ KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
}
- } else if ((ops & Net.POLLOUT) != 0) {
- KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
- registeredWriteFilter.set(fd);
+
+ ski.registeredEvents(newEvents);
}
}
}
@@ -197,8 +191,8 @@
}
/**
- * Update the keys whose fd's have been selected by kqueue.
- * Add the ready keys to the selected key set.
+ * Update the keys of file descriptors that were polled and add them to
+ * the selected-key set.
* If the interrupt fd has been selected, drain it and clear the interrupt.
*/
private int updateSelectedKeys(int numEntries) throws IOException {
@@ -265,7 +259,6 @@
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
@@ -277,63 +270,41 @@
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
-
- // Deregister channels
- Iterator<SelectionKey> i = keys.iterator();
- while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- i.remove();
- }
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
- assert Thread.holdsLock(nioKeys());
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
- keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
- assert Thread.holdsLock(nioSelectedKeys());
int fd = ski.channel.getFDVal();
- fdToKey.remove(fd);
- if (registeredReadFilter.get(fd)) {
- KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
- registeredReadFilter.clear(fd);
- }
- if (registeredWriteFilter.get(fd)) {
- KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
- registeredWriteFilter.clear(fd);
+ int registeredEvents = ski.registeredEvents();
+ if (fdToKey.remove(fd) != null) {
+ if (registeredEvents != 0) {
+ if ((registeredEvents & Net.POLLIN) != 0)
+ KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
+ if ((registeredEvents & Net.POLLOUT) != 0)
+ KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
+ ski.registeredEvents(0);
+ }
+ } else {
+ assert registeredEvents == 0;
}
-
- selectedKeys.remove(ski);
- keys.remove(ski);
-
- // remove from channel's key set
- deregister(ski);
-
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl ski, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
- updateOps.addLast(ops); // ops first in case adding the key fails
+ updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski);
}
}
--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -1229,10 +1229,9 @@
/**
* Translates native poll revent set into a ready operation set
*/
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0) {
@@ -1244,7 +1243,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -1256,16 +1255,16 @@
((intOps & SelectionKey.OP_WRITE) != 0))
newOps |= SelectionKey.OP_WRITE;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
/**
@@ -1295,16 +1294,15 @@
/**
* Translates an interest operation set into a native poll event set
*/
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
-
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= Net.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= Net.POLLIN;
- sk.selector.putEventOps(sk, newOps);
+ return newOps;
}
public FileDescriptor getFD() {
--- a/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -61,7 +61,10 @@
*/
boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk);
- void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);
+ /**
+ * Translates an interest operation set into a native event set
+ */
+ int translateInterestOps(int ops);
void kill() throws IOException;
--- a/src/java.base/share/classes/sun/nio/ch/SelectionKeyImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/SelectionKeyImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -39,21 +39,28 @@
public final class SelectionKeyImpl
extends AbstractSelectionKey
{
-
final SelChImpl channel; // package-private
- public final SelectorImpl selector;
-
- // Index for a pollfd array in Selector that this key is registered with
- private int index;
+ private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
+ // registered events in kernel, used by some Selector implementations
+ private int registeredEvents;
+
+ // index of key in pollfd array, used by some Selector implementations
+ private int index;
+
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
+ private void ensureValid() {
+ if (!isValid())
+ throw new CancelledKeyException();
+ }
+
@Override
public SelectableChannel channel() {
return (SelectableChannel)channel;
@@ -61,20 +68,7 @@
@Override
public Selector selector() {
- return (Selector)selector;
- }
-
- int getIndex() { // package-private
- return index;
- }
-
- void setIndex(int i) { // package-private
- index = i;
- }
-
- private void ensureValid() {
- if (!isValid())
- throw new CancelledKeyException();
+ return selector;
}
@Override
@@ -109,7 +103,7 @@
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
- channel.translateAndSetInterestOps(ops, this);
+ selector.putEventOps(this, channel.translateInterestOps(ops));
interestOps = ops;
return this;
}
@@ -118,6 +112,24 @@
return interestOps;
}
+ void registeredEvents(int events) {
+ // assert Thread.holdsLock(selector);
+ this.registeredEvents = events;
+ }
+
+ int registeredEvents() {
+ // assert Thread.holdsLock(selector);
+ return registeredEvents;
+ }
+
+ int getIndex() {
+ return index;
+ }
+
+ void setIndex(int i) {
+ index = i;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
--- a/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -26,9 +26,9 @@
package sun.nio.ch;
import java.io.IOException;
-import java.net.SocketException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalSelectorException;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector;
@@ -47,7 +47,7 @@
extends AbstractSelector
{
// The set of keys registered with this Selector
- protected final HashSet<SelectionKey> keys;
+ protected final Set<SelectionKey> keys;
// The set of keys with data ready for an operation
protected final Set<SelectionKey> selectedKeys;
@@ -88,6 +88,26 @@
return publicSelectedKeys;
}
+ /**
+ * Marks the beginning of a select operation that might block
+ */
+ protected final void begin(boolean blocking) {
+ if (blocking) begin();
+ }
+
+ /**
+ * Marks the end of a select operation that may have blocked
+ */
+ protected final void end(boolean blocking) {
+ if (blocking) end();
+ }
+
+ /**
+ * Selects the keys for channels that are ready for I/O operations.
+ *
+ * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to
+ * wait indefinitely
+ */
protected abstract int doSelect(long timeout) throws IOException;
private int lockAndDoSelect(long timeout) throws IOException {
@@ -125,9 +145,21 @@
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
+ implClose();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
- implClose();
+ // Deregister channels
+ Iterator<SelectionKey> i = keys.iterator();
+ while (i.hasNext()) {
+ SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
+ deregister(ski);
+ SelectableChannel selch = ski.channel();
+ if (!selch.isOpen() && !selch.isRegistered())
+ ((SelChImpl)selch).kill();
+ selectedKeys.remove(ski);
+ i.remove();
+ }
+ assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
@@ -144,8 +176,10 @@
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
+ // register before adding to key set
+ implRegister(k);
synchronized (publicKeys) {
- implRegister(k);
+ keys.add(k);
}
k.interestOps(ops);
return k;
@@ -156,27 +190,37 @@
protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
protected final void processDeregisterQueue() throws IOException {
- // Precondition: Synchronized on this, keys, and selectedKeys
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(publicKeys);
+ assert Thread.holdsLock(publicSelectedKeys);
+
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- try {
- implDereg(ski);
- } catch (SocketException se) {
- throw new IOException("Error deregistering key", se);
- } finally {
- i.remove();
- }
+ i.remove();
+
+ // remove the key from the selector
+ implDereg(ski);
+
+ selectedKeys.remove(ski);
+ keys.remove(ski);
+
+ // remove from channel's key set
+ deregister(ski);
+
+ SelectableChannel ch = ski.channel();
+ if (!ch.isOpen() && !ch.isRegistered())
+ ((SelChImpl)ch).kill();
}
}
}
}
/**
- * Invoked to change the key's interest set
+ * Change the event set in the selector
*/
- public abstract void putEventOps(SelectionKeyImpl ski, int ops);
+ protected abstract void putEventOps(SelectionKeyImpl ski, int events);
}
--- a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -445,10 +445,9 @@
/**
* Translates native poll revent set into a ready operation set
*/
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0) {
@@ -460,7 +459,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -468,29 +467,26 @@
((intOps & SelectionKey.OP_ACCEPT) != 0))
newOps |= SelectionKey.OP_ACCEPT;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
/**
* Translates an interest operation set into a native poll event set
*/
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
-
- // Translate ops
if ((ops & SelectionKey.OP_ACCEPT) != 0)
newOps |= Net.POLLIN;
- // Place ops into pollfd array
- sk.selector.putEventOps(sk, newOps);
+ return newOps;
}
public FileDescriptor getFD() {
--- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -994,10 +994,9 @@
/**
* Translates native poll revent ops into a ready operation ops
*/
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0) {
@@ -1009,7 +1008,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -1026,22 +1025,22 @@
((intOps & SelectionKey.OP_WRITE) != 0) && connected)
newOps |= SelectionKey.OP_WRITE;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
/**
* Translates an interest operation set into a native poll event set
*/
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
@@ -1049,7 +1048,7 @@
newOps |= Net.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= Net.POLLCONN;
- sk.selector.putEventOps(sk, newOps);
+ return newOps;
}
public FileDescriptor getFD() {
--- a/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -27,15 +27,11 @@
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
-import java.util.BitSet;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -59,14 +55,11 @@
// maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
- // file descriptors registered with /dev/poll, synchronize on selector
- private final BitSet registered = new BitSet();
-
// pending new registrations/updates, queued by implRegister and putEventOps
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
- private final Deque<Integer> updateOps = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing
private final Object interruptLock = new Object();
@@ -99,15 +92,16 @@
throws IOException
{
assert Thread.holdsLock(this);
+ boolean blocking = (timeout != 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
- begin();
+ begin(blocking);
numEntries = pollWrapper.poll(timeout);
} finally {
- end();
+ end(blocking);
}
processDeregisterQueue();
return updateSelectedKeys(numEntries);
@@ -125,41 +119,35 @@
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
+ int fd = ski.channel.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
- assert registered.get(fd) == false;
+ assert ski.registeredEvents() == 0;
}
}
// Translate the queued updates to changes to the set of monitored
// file descriptors. The changes are written to the /dev/poll driver
// in bulk.
- assert updateKeys.size() == updateOps.size();
+ assert updateKeys.size() == updateEvents.size();
int index = 0;
while ((ski = updateKeys.pollFirst()) != null) {
- int ops = updateOps.pollFirst();
+ int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
- if (registered.get(fd)) {
- if (ops == 0) {
- // remove file descriptor
- pollWrapper.putPollFD(index++, fd, POLLREMOVE);
- registered.clear(fd);
- } else {
- // change events
+ int registeredEvents = ski.registeredEvents();
+ if (newEvents != registeredEvents) {
+ if (registeredEvents != 0)
pollWrapper.putPollFD(index++, fd, POLLREMOVE);
- pollWrapper.putPollFD(index++, fd, (short)ops);
+ if (newEvents != 0)
+ pollWrapper.putPollFD(index++, fd, (short)newEvents);
+ ski.registeredEvents(newEvents);
+
+ // write to /dev/poll
+ if (index > (NUM_POLLFDS-2)) {
+ pollWrapper.registerMultiple(index);
+ index = 0;
}
- } else if (ops != 0) {
- // add file descriptor
- pollWrapper.putPollFD(index++, fd, (short)ops);
- registered.set(fd);
- }
- if (index > (NUM_POLLFDS-2)) {
- pollWrapper.registerMultiple(index);
- index = 0;
}
}
}
@@ -171,8 +159,9 @@
}
/**
- * Update the keys whose fd's have been selected by the /dev/poll.
- * Add the ready keys to the ready queue.
+ * Update the keys of file descriptors that were polled and add them to
+ * the selected-key set.
+ * If the interrupt fd has been selected, drain it and clear the interrupt.
*/
private int updateSelectedKeys(int numEntries) throws IOException {
assert Thread.holdsLock(this);
@@ -214,7 +203,6 @@
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
@@ -224,59 +212,37 @@
pollWrapper.close();
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
-
- // Deregister channels
- Iterator<SelectionKey> i = keys.iterator();
- while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- i.remove();
- }
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
- assert Thread.holdsLock(nioKeys());
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
- keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
- assert Thread.holdsLock(nioSelectedKeys());
int fd = ski.channel.getFDVal();
- fdToKey.remove(fd);
- if (registered.get(fd)) {
- pollWrapper.register(fd, POLLREMOVE);
- registered.clear(fd);
+ if (fdToKey.remove(fd) != null) {
+ if (ski.registeredEvents() != 0) {
+ pollWrapper.register(fd, POLLREMOVE);
+ ski.registeredEvents(0);
+ }
+ } else {
+ assert ski.registeredEvents() == 0;
}
-
- selectedKeys.remove(ski);
- keys.remove(ski);
-
- // remove from channel's key set
- deregister(ski);
-
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl ski, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
- updateOps.addLast(ops); // ops first in case adding the key fails
+ updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski);
}
}
--- a/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -27,17 +27,14 @@
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import jdk.internal.misc.Unsafe;
import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
@@ -69,18 +66,8 @@
private final long pollArrayAddress;
private final AllocatedNativeObject pollArray;
- // a registration of a file descriptor with a selector
- private static class RegEntry {
- final SelectionKeyImpl ski;
- int registeredOps;
- int lastUpdate;
- RegEntry(SelectionKeyImpl ski) {
- this.ski = ski;
- }
- }
-
- // maps a file descriptor to registration entry, synchronize on selector
- private final Map<Integer, RegEntry> fdToRegEntry = new HashMap<>();
+ // maps file descriptor to selection key, synchronize on selector
+ private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
// the last update operation, incremented by processUpdateQueue
private int lastUpdate;
@@ -90,7 +77,7 @@
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
- private final Deque<Integer> updateOps = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing
private final Object interruptLock = new Object();
@@ -115,14 +102,16 @@
protected int doSelect(long timeout) throws IOException {
assert Thread.holdsLock(this);
+ long to = timeout;
+ boolean blocking = (to != 0);
+ boolean timedPoll = (to > 0);
+
int numEvents;
processUpdateQueue();
processDeregisterQueue();
try {
- begin();
+ begin(blocking);
- long to = timeout;
- boolean timedPoll = (to > 0);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to);
@@ -139,7 +128,7 @@
assert IOStatus.check(numEvents);
} finally {
- end();
+ end(blocking);
}
processDeregisterQueue();
return processPortEvents(numEvents);
@@ -161,30 +150,27 @@
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
- RegEntry previous = fdToRegEntry.put(fd, new RegEntry(ski));
+ int fd = ski.channel.getFDVal();
+ SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
+ assert ski.registeredEvents() == 0;
}
}
// changes to interest ops
- assert updateKeys.size() == updateOps.size();
+ assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) {
- int ops = updateOps.pollFirst();
+ int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
- RegEntry e = fdToRegEntry.get(fd);
- if (ski.isValid() && (e != null) && (e.lastUpdate != lastUpdate)) {
- assert e.ski == ski;
- if ((ops != e.registeredOps)) {
- if (ops == 0) {
+ if (ski.isValid() && fdToKey.containsKey(fd)) {
+ if (newEvents != ski.registeredEvents()) {
+ if (newEvents == 0) {
port_dissociate(pfd, PORT_SOURCE_FD, fd);
} else {
- port_associate(pfd, PORT_SOURCE_FD, fd, ops);
+ port_associate(pfd, PORT_SOURCE_FD, fd, newEvents);
}
- e.registeredOps = ops;
+ ski.registeredEvents(newEvents);
}
- e.lastUpdate = lastUpdate;
}
}
}
@@ -209,9 +195,8 @@
short source = getSource(i);
if (source == PORT_SOURCE_FD) {
int fd = getDescriptor(i);
- RegEntry e = fdToRegEntry.get(fd);
- if (e != null) {
- SelectionKeyImpl ski = e.ski;
+ SelectionKeyImpl ski = fdToKey.get(fd);
+ if (ski != null) {
int rOps = getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
@@ -225,12 +210,11 @@
}
}
- // queue selection key so that it is re-associated at
- // next select. Push to end of deque so that changes to
- // the interest ops are processed first
- updateKeys.addLast(ski);
- updateOps.addLast(e.registeredOps);
- e.registeredOps = 0;
+ // re-queue key to head so that it is re-associated at
+ // next select (and before other changes)
+ updateEvents.addFirst(ski.registeredEvents());
+ updateKeys.addFirst(ski);
+ ski.registeredEvents(0);
}
} else if (source == PORT_SOURCE_USER) {
interrupted = true;
@@ -250,7 +234,6 @@
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
@@ -259,61 +242,38 @@
port_close(pfd);
pollArray.free();
-
- // Deregister channels
- Iterator<SelectionKey> i = keys.iterator();
- while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- i.remove();
- }
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
- assert Thread.holdsLock(nioKeys());
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
- keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
- assert Thread.holdsLock(nioSelectedKeys());
int fd = ski.channel.getFDVal();
- RegEntry e = fdToRegEntry.remove(fd);
- if (e != null && e.registeredOps != 0) {
- port_dissociate(pfd, PORT_SOURCE_FD, fd);
+ if (fdToKey.remove(fd) != null) {
+ if (ski.registeredEvents() != 0) {
+ port_dissociate(pfd, PORT_SOURCE_FD, fd);
+ ski.registeredEvents(0);
+ }
+ } else {
+ assert ski.registeredEvents() == 0;
}
-
- selectedKeys.remove(ski);
- keys.remove(ski);
-
- // remove from channel's key set
- deregister(ski);
-
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl ski, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
- // push to front of deque so that it processed before other
- // updates for the same key.
- updateOps.addFirst(ops);
- updateKeys.addFirst(ski);
+ updateEvents.addLast(events); // events first in case adding key fails
+ updateKeys.addLast(ski);
}
}
--- a/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -26,14 +26,11 @@
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -63,7 +60,7 @@
// pending updates, queued by putEventOps
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
- private final Deque<Integer> updateOps = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing
private final Object interruptLock = new Object();
@@ -99,13 +96,15 @@
protected int doSelect(long timeout) throws IOException {
assert Thread.holdsLock(this);
+ int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
+ boolean blocking = (to != 0);
+ boolean timedPoll = (to > 0);
+
processUpdateQueue();
processDeregisterQueue();
try {
- begin();
+ begin(blocking);
- int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
- boolean timedPoll = (to > 0);
int numPolled;
do {
long startTime = timedPoll ? System.nanoTime() : 0;
@@ -123,7 +122,7 @@
assert numPolled <= pollArraySize;
} finally {
- end();
+ end(blocking);
}
processDeregisterQueue();
@@ -137,23 +136,22 @@
assert Thread.holdsLock(this);
synchronized (updateLock) {
- assert updateKeys.size() == updateOps.size();
-
+ assert updateKeys.size() == updateEvents.size();
SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
- int ops = updateOps.pollFirst();
+ int newEvents = updateEvents.pollFirst();
if (ski.isValid()) {
int index = ski.getIndex();
assert index >= 0 && index < pollArraySize;
if (index > 0) {
assert pollKeys.get(index) == ski;
- if (ops == 0) {
+ if (newEvents == 0) {
remove(ski);
} else {
- update(ski, ops);
+ update(ski, newEvents);
}
- } else if (ops != 0) {
- add(ski, ops);
+ } else if (newEvents != 0) {
+ add(ski, newEvents);
}
}
}
@@ -161,8 +159,8 @@
}
/**
- * Update the keys whose fd's have been selected by kqueue.
- * Add the ready keys to the selected key set.
+ * Update the keys of file descriptors that were polled and add them to
+ * the selected-key set.
* If the interrupt fd has been selected, drain it and clear the interrupt.
*/
private int updateSelectedKeys() throws IOException {
@@ -205,7 +203,6 @@
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
@@ -215,59 +212,31 @@
pollArray.free();
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
-
- // Deregister channels
- Iterator<SelectionKey> i = keys.iterator();
- while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- ski.setIndex(-1);
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- i.remove();
- }
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
assert ski.getIndex() == 0;
- assert Thread.holdsLock(nioKeys());
-
ensureOpen();
- keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
- assert Thread.holdsLock(nioKeys());
- assert Thread.holdsLock(nioSelectedKeys());
// remove from poll array
int index = ski.getIndex();
if (index > 0) {
remove(ski);
}
-
- // remove from selected-key and key set
- selectedKeys.remove(ski);
- keys.remove(ski);
-
- // remove from channel's key set
- deregister(ski);
-
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl ski, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
- updateOps.addLast(ops); // ops first in case adding the key fails
+ updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski);
}
}
--- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -164,10 +164,9 @@
}
}
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps();// Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@@ -175,7 +174,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -183,22 +182,23 @@
((intOps & SelectionKey.OP_WRITE) != 0))
newOps |= SelectionKey.OP_WRITE;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
+ int newOps = 0;
if (ops == SelectionKey.OP_WRITE)
- ops = Net.POLLOUT;
- sk.selector.putEventOps(sk, ops);
+ newOps |= Net.POLLOUT;
+ return newOps;
}
/**
--- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -164,10 +164,9 @@
}
}
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@@ -175,7 +174,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -183,22 +182,23 @@
((intOps & SelectionKey.OP_READ) != 0))
newOps |= SelectionKey.OP_READ;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
+ int newOps = 0;
if (ops == SelectionKey.OP_READ)
- ops = Net.POLLIN;
- sk.selector.putEventOps(sk, ops);
+ newOps |= Net.POLLIN;
+ return newOps;
}
/**
--- a/src/java.base/windows/classes/sun/nio/ch/PollArrayWrapper.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/windows/classes/sun/nio/ch/PollArrayWrapper.java Fri Mar 30 08:28:09 2018 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -63,8 +63,9 @@
}
// Prepare another pollfd struct for use.
- void addEntry(int index, SelectionKeyImpl ski) {
+ void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.channel.getFDVal());
+ putEventOps(index, 0);
}
// Writes the pollfd entry from the source wrapper at the source index
--- a/src/java.base/windows/classes/sun/nio/ch/SinkChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/windows/classes/sun/nio/ch/SinkChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -72,10 +72,9 @@
sc.configureBlocking(block);
}
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@@ -83,7 +82,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -91,22 +90,23 @@
((intOps & SelectionKey.OP_WRITE) != 0))
newOps |= SelectionKey.OP_WRITE;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
+ int newOps = 0;
if ((ops & SelectionKey.OP_WRITE) != 0)
- ops = Net.POLLOUT;
- sk.selector.putEventOps(sk, ops);
+ newOps |= Net.POLLOUT;
+ return newOps;
}
public int write(ByteBuffer src) throws IOException {
--- a/src/java.base/windows/classes/sun/nio/ch/SourceChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/windows/classes/sun/nio/ch/SourceChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -71,10 +71,9 @@
sc.configureBlocking(block);
}
- public boolean translateReadyOps(int ops, int initialOps,
- SelectionKeyImpl sk) {
- int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
- int oldOps = sk.nioReadyOps();
+ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
+ int intOps = ski.nioInterestOps();
+ int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@@ -82,7 +81,7 @@
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@@ -90,22 +89,23 @@
((intOps & SelectionKey.OP_READ) != 0))
newOps |= SelectionKey.OP_READ;
- sk.nioReadyOps(newOps);
+ ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
- public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, sk.nioReadyOps(), sk);
+ public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
- public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
- return translateReadyOps(ops, 0, sk);
+ public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
+ return translateReadyOps(ops, 0, ski);
}
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
+ int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
- ops = Net.POLLIN;
- sk.selector.putEventOps(sk, ops);
+ newOps |= Net.POLLIN;
+ return newOps;
}
public int read(ByteBuffer dst) throws IOException {
--- a/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -23,23 +23,19 @@
* questions.
*/
-/*
- */
-
-
package sun.nio.ch;
-import java.nio.channels.spi.SelectorProvider;
-import java.nio.channels.Selector;
+import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
-import java.nio.channels.SelectableChannel;
-import java.io.IOException;
-import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.Map;
/**
* A multi-threaded implementation of Selector for Windows.
@@ -80,9 +76,6 @@
// File descriptors corresponding to source and sink
private final int wakeupSourceFd, wakeupSinkFd;
- // Lock for close cleanup
- private final Object closeLock = new Object();
-
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
@@ -103,7 +96,7 @@
// class for fdMap entries
private static final class MapEntry {
- SelectionKeyImpl ski;
+ final SelectionKeyImpl ski;
long updateCount = 0;
long clearedCount = 0;
MapEntry(SelectionKeyImpl ski) {
@@ -121,6 +114,13 @@
private final Object interruptLock = new Object();
private volatile boolean interruptTriggered;
+ // pending new registrations/updates, queued by implRegister and putEventOps
+ private final Object updateLock = new Object();
+ private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
+ private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
+ private final Deque<Integer> updateEvents = new ArrayDeque<>();
+
+
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
@@ -135,11 +135,16 @@
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
+ private void ensureOpen() {
+ if (!isOpen())
+ throw new ClosedSelectorException();
+ }
+
@Override
protected int doSelect(long timeout) throws IOException {
- if (channelArray == null)
- throw new ClosedSelectorException();
+ assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
+ processUpdateQueue();
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
@@ -176,6 +181,42 @@
return updated;
}
+ /**
+ * Process new registrations and changes to the interest ops.
+ */
+ private void processUpdateQueue() {
+ assert Thread.holdsLock(this);
+
+ synchronized (updateLock) {
+ SelectionKeyImpl ski;
+
+ // new registrations
+ while ((ski = newKeys.pollFirst()) != null) {
+ if (ski.isValid()) {
+ growIfNeeded();
+ channelArray[totalChannels] = ski;
+ ski.setIndex(totalChannels);
+ pollWrapper.putEntry(totalChannels, ski);
+ totalChannels++;
+ MapEntry previous = fdMap.put(ski);
+ assert previous == null;
+ }
+ }
+
+ // changes to interest ops
+ assert updateKeys.size() == updateEvents.size();
+ while ((ski = updateKeys.pollFirst()) != null) {
+ int events = updateEvents.pollFirst();
+ int fd = ski.channel.getFDVal();
+ if (ski.isValid() && fdMap.containsKey(fd)) {
+ int index = ski.getIndex();
+ assert index >= 0 && index < totalChannels;
+ pollWrapper.putEventOps(index, events);
+ }
+ }
+ }
+ }
+
// Helper threads wait on this lock for the next poll.
private final StartLock startLock = new StartLock();
@@ -503,46 +544,29 @@
@Override
protected void implClose() throws IOException {
- synchronized (closeLock) {
- if (channelArray != null) {
- if (pollWrapper != null) {
- // prevent further wakeup
- synchronized (interruptLock) {
- interruptTriggered = true;
- }
- wakeupPipe.sink().close();
- wakeupPipe.source().close();
- for(int i = 1; i < totalChannels; i++) { // Deregister channels
- if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
- deregister(channelArray[i]);
- SelectableChannel selch = channelArray[i].channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- }
- }
- pollWrapper.free();
- pollWrapper = null;
- channelArray = null;
- // Make all remaining helper threads exit
- for (SelectThread t: threads)
- t.makeZombie();
- startLock.startThreads();
- }
- }
+ assert !isOpen();
+ assert Thread.holdsLock(this);
+
+ // prevent further wakeup
+ synchronized (interruptLock) {
+ interruptTriggered = true;
}
+
+ wakeupPipe.sink().close();
+ wakeupPipe.source().close();
+ pollWrapper.free();
+
+ // Make all remaining helper threads exit
+ for (SelectThread t: threads)
+ t.makeZombie();
+ startLock.startThreads();
}
+ @Override
protected void implRegister(SelectionKeyImpl ski) {
- synchronized (closeLock) {
- if (pollWrapper == null)
- throw new ClosedSelectorException();
- growIfNeeded();
- channelArray[totalChannels] = ski;
- ski.setIndex(totalChannels);
- fdMap.put(ski);
- keys.add(ski);
- pollWrapper.addEntry(totalChannels, ski);
- totalChannels++;
+ ensureOpen();
+ synchronized (updateLock) {
+ newKeys.addLast(ski);
}
}
@@ -561,47 +585,43 @@
}
}
- protected void implDereg(SelectionKeyImpl ski) throws IOException{
- int i = ski.getIndex();
- assert (i >= 0);
- synchronized (closeLock) {
+ @Override
+ protected void implDereg(SelectionKeyImpl ski) {
+ assert !ski.isValid();
+ assert Thread.holdsLock(this);
+
+ if (fdMap.remove(ski) != null) {
+ int i = ski.getIndex();
+ assert (i >= 0);
+
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
- pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
- pollWrapper, i);
+ pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
- }
- channelArray[totalChannels - 1] = null;
- totalChannels--;
- if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
+
+ channelArray[totalChannels - 1] = null;
totalChannels--;
- threadsCount--; // The last thread has become redundant.
- }
- fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
- keys.remove(ski);
- selectedKeys.remove(ski);
- deregister(ski);
- SelectableChannel selch = ski.channel();
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- }
-
- public void putEventOps(SelectionKeyImpl sk, int ops) {
- synchronized (closeLock) {
- if (pollWrapper == null)
- throw new ClosedSelectorException();
- // make sure this sk has not been removed yet
- int index = sk.getIndex();
- if (index == -1)
- throw new CancelledKeyException();
- pollWrapper.putEventOps(index, ops);
+ if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
+ totalChannels--;
+ threadsCount--; // The last thread has become redundant.
+ }
}
}
+ @Override
+ public void putEventOps(SelectionKeyImpl ski, int events) {
+ ensureOpen();
+ synchronized (updateLock) {
+ updateEvents.addLast(events); // events first in case adding key fails
+ updateKeys.addLast(ski);
+ }
+ }
+
+ @Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
--- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -641,7 +641,7 @@
}
@Override
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
@@ -649,7 +649,7 @@
newOps |= Net.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= Net.POLLCONN;
- sk.selector.putEventOps(sk, newOps);
+ return newOps;
}
@Override
--- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -356,13 +356,13 @@
}
@Override
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= Net.POLLOUT;
- sk.selector.putEventOps(sk, newOps);
+ return newOps;
}
@Override
--- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java Thu Mar 29 22:12:05 2018 -0700
+++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java Fri Mar 30 08:28:09 2018 +0100
@@ -345,15 +345,11 @@
}
@Override
- public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
+ public int translateInterestOps(int ops) {
int newOps = 0;
-
- /* Translate ops */
if ((ops & SelectionKey.OP_ACCEPT) != 0)
newOps |= Net.POLLIN;
- /* Place ops into pollfd array */
- sk.selector.putEventOps(sk, newOps);
-
+ return newOps;
}
@Override