8009751: (se) Selector spin when select, close and interestOps(0) invoked at same time (lnx)
Reviewed-by: zhouyx, chegar, robm
--- a/jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java Wed Mar 13 13:22:02 2013 +0400
+++ b/jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java Wed Mar 13 17:58:45 2013 +0000
@@ -26,9 +26,9 @@
package sun.nio.ch;
import java.io.IOException;
-import java.util.LinkedList;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
/**
* Manipulates a native array of epoll_event structs on Linux:
@@ -52,37 +52,78 @@
* this implementation we set data.fd to be the file descriptor that we
* register. That way, we have the file descriptor available when we
* process the events.
- *
- * All file descriptors registered with epoll have the POLLHUP and POLLERR
- * events enabled even when registered with an event set of 0. To ensure
- * that epoll_wait doesn't poll an idle file descriptor when the underlying
- * connection is closed or reset then its registration is deleted from
- * epoll (it will be re-added again if the event set is changed)
*/
class EPollArrayWrapper {
// EPOLL_EVENTS
- static final int EPOLLIN = 0x001;
+ private static final int EPOLLIN = 0x001;
// opcodes
- static final int EPOLL_CTL_ADD = 1;
- static final int EPOLL_CTL_DEL = 2;
- static final int EPOLL_CTL_MOD = 3;
+ private static final int EPOLL_CTL_ADD = 1;
+ private static final int EPOLL_CTL_DEL = 2;
+ private static final int EPOLL_CTL_MOD = 3;
// Miscellaneous constants
- static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
- static final int EVENT_OFFSET = 0;
- static final int DATA_OFFSET = offsetofData();
- static final int FD_OFFSET = DATA_OFFSET;
- static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 8192);
+ private static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
+ private static final int EVENT_OFFSET = 0;
+ private static final int DATA_OFFSET = offsetofData();
+ private static final int FD_OFFSET = DATA_OFFSET;
+ private static final int OPEN_MAX = IOUtil.fdLimit();
+ private static final int NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192);
+
+ // Special value to indicate that an update should be ignored
+ private static final byte KILLED = (byte)-1;
- // Base address of the native pollArray
+ // Initial size of arrays for fd registration changes
+ private static final int INITIAL_PENDING_UPDATE_SIZE = 64;
+
+ // maximum size of updatesLow
+ private static final int MAX_UPDATE_ARRAY_SIZE = Math.min(OPEN_MAX, 64*1024);
+
+
+ // The fd of the epoll driver
+ private final int epfd;
+
+ // The epoll_event array for results from epoll_wait
+ private final AllocatedNativeObject pollArray;
+
+ // Base address of the epoll_event array
private final long pollArrayAddress;
- // Set of "idle" channels
- private final HashSet<SelChImpl> idleSet;
+ // The fd of the interrupt line going out
+ private int outgoingInterruptFD;
+
+ // The fd of the interrupt line coming in
+ private int incomingInterruptFD;
+
+ // The index of the interrupt FD
+ private int interruptedIndex;
+
+ // Number of updated pollfd entries
+ int updated;
+
+ // object to synchronize fd registration changes
+ private final Object updateLock = new Object();
- EPollArrayWrapper() {
+ // number of file descriptors with registration changes pending
+ private int updateCount;
+
+ // file descriptors with registration changes pending
+ private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE];
+
+ // events for file descriptors with registration changes pending, indexed
+ // by file descriptor and stored as bytes for efficiency reasons. For
+ // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
+ // least) then the update is stored in a map.
+ private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
+ private Map<Integer,Byte> eventsHigh;
+
+ // Used by release and updateRegistrations to track whether a file
+ // descriptor is registered with epoll.
+ private final BitSet registered = new BitSet();
+
+
+ EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
@@ -91,50 +132,11 @@
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
- for (int i=0; i<NUM_EPOLLEVENTS; i++) {
- putEventOps(i, 0);
- putData(i, 0L);
- }
-
- // create idle set
- idleSet = new HashSet<SelChImpl>();
+ // eventHigh needed when using file descriptors > 64k
+ if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
+ eventsHigh = new HashMap<>();
}
- // Used to update file description registrations
- private static class Updator {
- SelChImpl channel;
- int opcode;
- int events;
- Updator(SelChImpl channel, int opcode, int events) {
- this.channel = channel;
- this.opcode = opcode;
- this.events = events;
- }
- Updator(SelChImpl channel, int opcode) {
- this(channel, opcode, 0);
- }
- }
-
- private LinkedList<Updator> updateList = new LinkedList<Updator>();
-
- // The epoll_event array for results from epoll_wait
- private AllocatedNativeObject pollArray;
-
- // The fd of the epoll driver
- final int epfd;
-
- // The fd of the interrupt line going out
- int outgoingInterruptFD;
-
- // The fd of the interrupt line coming in
- int incomingInterruptFD;
-
- // The index of the interrupt FD
- int interruptedIndex;
-
- // Number of updated pollfd entries
- int updated;
-
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
@@ -146,11 +148,6 @@
pollArray.putInt(offset, event);
}
- void putData(int i, long value) {
- int offset = SIZE_EPOLLEVENT * i + DATA_OFFSET;
- pollArray.putLong(offset, value);
- }
-
void putDescriptor(int i, int fd) {
int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;
pollArray.putInt(offset, fd);
@@ -167,51 +164,83 @@
}
/**
- * Update the events for a given channel.
+ * Sets the pending update events for the given file descriptor. This
+ * method has no effect if the update events is already set to KILLED,
+ * unless {@code force} is {@code true}.
*/
- void setInterest(SelChImpl channel, int mask) {
- synchronized (updateList) {
- // if the previous pending operation is to add this file descriptor
- // to epoll then update its event set
- if (updateList.size() > 0) {
- Updator last = updateList.getLast();
- if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
- last.events = mask;
- return;
- }
+ private void setUpdateEvents(int fd, byte events, boolean force) {
+ if (fd < MAX_UPDATE_ARRAY_SIZE) {
+ if ((eventsLow[fd] != KILLED) || force) {
+ eventsLow[fd] = events;
+ }
+ } else {
+ Integer key = Integer.valueOf(fd);
+ if ((eventsHigh.get(key) != KILLED) || force) {
+ eventsHigh.put(key, Byte.valueOf(events));
}
+ }
+ }
- // update existing registration
- updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
+ /**
+ * Returns the pending update events for the given file descriptor.
+ */
+ private byte getUpdateEvents(int fd) {
+ if (fd < MAX_UPDATE_ARRAY_SIZE) {
+ return eventsLow[fd];
+ } else {
+ Byte result = eventsHigh.get(Integer.valueOf(fd));
+ // result should never be null
+ return result.byteValue();
}
}
/**
- * Add a channel's file descriptor to epoll
+ * Update the events for a given file descriptor
*/
- void add(SelChImpl channel) {
- synchronized (updateList) {
- updateList.add(new Updator(channel, EPOLL_CTL_ADD));
+ void setInterest(int fd, int mask) {
+ synchronized (updateLock) {
+ // record the file descriptor and events
+ int oldCapacity = updateDescriptors.length;
+ if (updateCount == oldCapacity) {
+ int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
+ int[] newDescriptors = new int[newCapacity];
+ System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
+ updateDescriptors = newDescriptors;
+ }
+ updateDescriptors[updateCount++] = fd;
+
+ // events are stored as bytes for efficiency reasons
+ byte b = (byte)mask;
+ assert (b == mask) && (b != KILLED);
+ setUpdateEvents(fd, b, false);
}
}
/**
- * Remove a channel's file descriptor from epoll
+ * Add a file descriptor
*/
- void release(SelChImpl channel) {
- synchronized (updateList) {
- // flush any pending updates
- for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
- if (it.next().channel == channel) {
- it.remove();
- }
+ void add(int fd) {
+ // force the initial update events to 0 as it may be KILLED by a
+ // previous registration.
+ synchronized (updateLock) {
+ assert !registered.get(fd);
+ setUpdateEvents(fd, (byte)0, true);
+ }
+ }
+
+ /**
+ * Remove a file descriptor
+ */
+ void remove(int fd) {
+ synchronized (updateLock) {
+ // kill pending and future update for this file descriptor
+ setUpdateEvents(fd, KILLED, false);
+
+ // remove from epoll
+ if (registered.get(fd)) {
+ epollCtl(epfd, EPOLL_CTL_DEL, fd, 0);
+ registered.clear(fd);
}
-
- // remove from the idle set (if present)
- idleSet.remove(channel);
-
- // remove from epoll (if registered)
- epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}
@@ -239,36 +268,38 @@
/**
* Update the pending registrations.
*/
- void updateRegistrations() {
- synchronized (updateList) {
- Updator u = null;
- while ((u = updateList.poll()) != null) {
- SelChImpl ch = u.channel;
- if (!ch.isOpen())
- continue;
+ private void updateRegistrations() {
+ synchronized (updateLock) {
+ int j = 0;
+ while (j < updateCount) {
+ int fd = updateDescriptors[j];
+ short events = getUpdateEvents(fd);
+ boolean isRegistered = registered.get(fd);
+ int opcode = 0;
- // if the events are 0 then file descriptor is put into "idle
- // set" to prevent it being polled
- if (u.events == 0) {
- boolean added = idleSet.add(u.channel);
- // if added to idle set then remove from epoll if registered
- if (added && (u.opcode == EPOLL_CTL_MOD))
- epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
- } else {
- // events are specified. If file descriptor was in idle set
- // it must be re-registered (by converting opcode to ADD)
- boolean idle = false;
- if (!idleSet.isEmpty())
- idle = idleSet.remove(u.channel);
- int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
- epollCtl(epfd, opcode, ch.getFDVal(), u.events);
+ if (events != KILLED) {
+ if (isRegistered) {
+ opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+ } else {
+ opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
+ }
+ if (opcode != 0) {
+ epollCtl(epfd, opcode, fd, events);
+ if (opcode == EPOLL_CTL_ADD) {
+ registered.set(fd);
+ } else if (opcode == EPOLL_CTL_DEL) {
+ registered.clear(fd);
+ }
+ }
}
+ j++;
}
+ updateCount = 0;
}
}
// interrupt support
- boolean interrupted = false;
+ private boolean interrupted = false;
public void interrupt() {
interrupt(outgoingInterruptFD);
--- a/jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java Wed Mar 13 13:22:02 2013 +0400
+++ b/jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java Wed Mar 13 17:58:45 2013 +0000
@@ -53,26 +53,24 @@
private volatile boolean closed = false;
// Lock for interrupt triggering and clearing
- private Object interruptLock = new Object();
+ private final Object interruptLock = new Object();
private boolean interruptTriggered = false;
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
- EPollSelectorImpl(SelectorProvider sp) {
+ EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
- fdToKey = new HashMap<Integer,SelectionKeyImpl>();
+ fdToKey = new HashMap<>();
}
- protected int doSelect(long timeout)
- throws IOException
- {
+ protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
@@ -161,8 +159,9 @@
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
- fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
- pollWrapper.add(ch);
+ int fd = Integer.valueOf(ch.getFDVal());
+ fdToKey.put(fd, ski);
+ pollWrapper.add(fd);
keys.add(ski);
}
@@ -171,7 +170,7 @@
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(Integer.valueOf(fd));
- pollWrapper.release(ch);
+ pollWrapper.remove(fd);
ski.setIndex(-1);
keys.remove(ski);
selectedKeys.remove(ski);
@@ -181,10 +180,11 @@
((SelChImpl)selch).kill();
}
- public void putEventOps(SelectionKeyImpl sk, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
- pollWrapper.setInterest(sk.channel, ops);
+ SelChImpl ch = ski.channel;
+ pollWrapper.setInterest(ch.getFDVal(), ops);
}
public Selector wakeup() {
@@ -200,5 +200,4 @@
static {
Util.load();
}
-
}