--- a/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java Sat Mar 24 14:43:04 2018 +0900
+++ b/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java Sat Mar 24 08:49:55 2018 +0000
@@ -26,11 +26,31 @@
package sun.nio.ch;
import java.io.IOException;
-import java.nio.channels.*;
-import java.nio.channels.spi.*;
-import java.util.Map;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import jdk.internal.misc.Unsafe;
+
+import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
+import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
+import static sun.nio.ch.SolarisEventPort.SIZEOF_PORT_EVENT;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_EVENTS;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_SOURCE;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_OBJECT;
+import static sun.nio.ch.SolarisEventPort.port_create;
+import static sun.nio.ch.SolarisEventPort.port_close;
+import static sun.nio.ch.SolarisEventPort.port_associate;
+import static sun.nio.ch.SolarisEventPort.port_dissociate;
+import static sun.nio.ch.SolarisEventPort.port_getn;
+import static sun.nio.ch.SolarisEventPort.port_send;
/**
* Selector implementation based on the Solaris event port mechanism.
@@ -39,89 +59,206 @@
class EventPortSelectorImpl
extends SelectorImpl
{
- private final EventPortWrapper pollWrapper;
+ // maximum number of events to retrive in one call to port_getn
+ static final int MAX_EVENTS = Math.min(IOUtil.fdLimit()-1, 1024);
- // Maps from file descriptors to keys
- private final Map<Integer, SelectionKeyImpl> fdToKey;
+ // port file descriptor
+ private final int pfd;
+
+ // the poll array (populated by port_getn)
+ private final long pollArrayAddress;
+ private final AllocatedNativeObject pollArray;
- // True if this Selector has been closed
- private boolean closed;
+ // a registration of a file descriptor with a selector
+ private static class RegEntry {
+ final SelectionKeyImpl ski;
+ int registeredOps;
+ int lastUpdate;
+ RegEntry(SelectionKeyImpl ski) {
+ this.ski = ski;
+ }
+ }
- // Lock for interrupt triggering and clearing
+ // maps a file descriptor to registration entry, synchronize on selector
+ private final Map<Integer, RegEntry> fdToRegEntry = new HashMap<>();
+
+ // the last update operation, incremented by processUpdateQueue
+ private int lastUpdate;
+
+ // pending new registrations/updates, queued by implRegister, putEventOps,
+ // and updateSelectedKeys
+ private final Object updateLock = new Object();
+ private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
+ private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
+ private final Deque<Integer> updateOps = new ArrayDeque<>();
+
+ // interrupt triggering and clearing
private final Object interruptLock = new Object();
private boolean interruptTriggered;
- /**
- * Package private constructor called by factory method in
- * the abstract superclass Selector.
- */
EventPortSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
- pollWrapper = new EventPortWrapper();
- fdToKey = new HashMap<>();
+
+ this.pfd = port_create();
+
+ int allocationSize = MAX_EVENTS * SIZEOF_PORT_EVENT;
+ this.pollArray = new AllocatedNativeObject(allocationSize, false);
+ this.pollArrayAddress = pollArray.address();
}
private void ensureOpen() {
- if (closed)
+ if (!isOpen())
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) throws IOException {
- ensureOpen();
+ assert Thread.holdsLock(this);
+
+ int numEvents;
+ processUpdateQueue();
processDeregisterQueue();
- int entries;
try {
begin();
- entries = pollWrapper.poll(timeout);
+
+ long to = timeout;
+ boolean timedPoll = (to > 0);
+ do {
+ long startTime = timedPoll ? System.nanoTime() : 0;
+ numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to);
+ if (numEvents == IOStatus.INTERRUPTED && timedPoll) {
+ // timed poll interrupted so need to adjust timeout
+ long adjust = System.nanoTime() - startTime;
+ to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
+ if (to <= 0) {
+ // timeout also expired so no retry
+ numEvents = 0;
+ }
+ }
+ } while (numEvents == IOStatus.INTERRUPTED);
+ assert IOStatus.check(numEvents);
+
} finally {
end();
}
processDeregisterQueue();
- int numKeysUpdated = updateSelectedKeys(entries);
- if (pollWrapper.interrupted()) {
- synchronized (interruptLock) {
- interruptTriggered = false;
+ return processPortEvents(numEvents);
+ }
+
+ /**
+ * Process new registrations and changes to the interest ops.
+ */
+ private void processUpdateQueue() throws IOException {
+ assert Thread.holdsLock(this);
+
+ // bump lastUpdate to ensure that the interest ops are changed at most
+ // once per bulk update
+ lastUpdate++;
+
+ synchronized (updateLock) {
+ SelectionKeyImpl ski;
+
+ // new registrations
+ while ((ski = newKeys.pollFirst()) != null) {
+ if (ski.isValid()) {
+ SelChImpl ch = ski.channel;
+ int fd = ch.getFDVal();
+ RegEntry previous = fdToRegEntry.put(fd, new RegEntry(ski));
+ assert previous == null;
+ }
+ }
+
+ // changes to interest ops
+ assert updateKeys.size() == updateOps.size();
+ while ((ski = updateKeys.pollFirst()) != null) {
+ int ops = updateOps.pollFirst();
+ int fd = ski.channel.getFDVal();
+ RegEntry e = fdToRegEntry.get(fd);
+ if (ski.isValid() && (e != null) && (e.lastUpdate != lastUpdate)) {
+ assert e.ski == ski;
+ if ((ops != e.registeredOps)) {
+ if (ops == 0) {
+ port_dissociate(pfd, PORT_SOURCE_FD, fd);
+ } else {
+ port_associate(pfd, PORT_SOURCE_FD, fd, ops);
+ }
+ e.registeredOps = ops;
+ }
+ e.lastUpdate = lastUpdate;
+ }
}
}
- return numKeysUpdated;
}
- private int updateSelectedKeys(int entries) {
+ /**
+ * Process the port events. This method updates the keys of file descriptors
+ * that were polled. It also re-queues the key so that the file descriptor
+ * is re-associated at the next select operation.
+ *
+ * @return the number of selection keys updated.
+ */
+ private int processPortEvents(int numEvents) throws IOException {
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioSelectedKeys());
+
int numKeysUpdated = 0;
- for (int i=0; i<entries; i++) {
- int nextFD = pollWrapper.getDescriptor(i);
- SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
- if (ski != null) {
- int rOps = pollWrapper.getEventOps(i);
- if (selectedKeys.contains(ski)) {
- if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
- numKeysUpdated++;
+ boolean interrupted = false;
+
+ synchronized (updateLock) {
+ for (int i = 0; i < numEvents; i++) {
+ short source = getSource(i);
+ if (source == PORT_SOURCE_FD) {
+ int fd = getDescriptor(i);
+ RegEntry e = fdToRegEntry.get(fd);
+ if (e != null) {
+ SelectionKeyImpl ski = e.ski;
+ int rOps = getEventOps(i);
+ if (selectedKeys.contains(ski)) {
+ if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
+ numKeysUpdated++;
+ }
+ } else {
+ ski.channel.translateAndSetReadyOps(rOps, ski);
+ if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
+ selectedKeys.add(ski);
+ numKeysUpdated++;
+ }
+ }
+
+ // queue selection key so that it is re-associated at
+ // next select. Push to end of deque so that changes to
+ // the interest ops are processed first
+ updateKeys.addLast(ski);
+ updateOps.addLast(e.registeredOps);
+ e.registeredOps = 0;
}
+ } else if (source == PORT_SOURCE_USER) {
+ interrupted = true;
} else {
- ski.channel.translateAndSetReadyOps(rOps, ski);
- if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
- selectedKeys.add(ski);
- numKeysUpdated++;
- }
+ assert false;
}
}
}
+
+ if (interrupted) {
+ clearInterrupt();
+ }
return numKeysUpdated;
}
@Override
protected void implClose() throws IOException {
- if (closed)
- return;
- closed = true;
+ assert !isOpen();
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
- pollWrapper.close();
+ port_close(pfd);
+ pollArray.free();
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
@@ -137,42 +274,83 @@
@Override
protected void implRegister(SelectionKeyImpl ski) {
- int fd = IOUtil.fdVal(ski.channel.getFD());
- fdToKey.put(Integer.valueOf(fd), ski);
+ assert Thread.holdsLock(nioKeys());
+ ensureOpen();
+ synchronized (updateLock) {
+ newKeys.addLast(ski);
+ }
keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
- int i = ski.getIndex();
- assert (i >= 0);
+ assert !ski.isValid();
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioKeys());
+ assert Thread.holdsLock(nioSelectedKeys());
+
int fd = ski.channel.getFDVal();
- fdToKey.remove(Integer.valueOf(fd));
- pollWrapper.release(fd);
- ski.setIndex(-1);
+ RegEntry e = fdToRegEntry.remove(fd);
+ if (e != null && e.registeredOps != 0) {
+ port_dissociate(pfd, PORT_SOURCE_FD, fd);
+ }
+
+ selectedKeys.remove(ski);
keys.remove(ski);
- selectedKeys.remove(ski);
- deregister((AbstractSelectionKey)ski);
+
+ // remove from channel's key set
+ deregister(ski);
+
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
+ ((SelChImpl) selch).kill();
}
@Override
- public void putEventOps(SelectionKeyImpl sk, int ops) {
+ public void putEventOps(SelectionKeyImpl ski, int ops) {
ensureOpen();
- int fd = sk.channel.getFDVal();
- pollWrapper.setInterest(fd, ops);
+ synchronized (updateLock) {
+ // push to front of deque so that it processed before other
+ // updates for the same key.
+ updateOps.addFirst(ops);
+ updateKeys.addFirst(ski);
+ }
}
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
- pollWrapper.interrupt();
+ try {
+ port_send(pfd, 0);
+ } catch (IOException ioe) {
+ throw new InternalError(ioe);
+ }
interruptTriggered = true;
}
}
return this;
}
+
+ private void clearInterrupt() throws IOException {
+ synchronized (interruptLock) {
+ interruptTriggered = false;
+ }
+ }
+
+ private short getSource(int i) {
+ int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_SOURCE;
+ return pollArray.getShort(offset);
+ }
+
+ private int getEventOps(int i) {
+ int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_EVENTS;
+ return pollArray.getInt(offset);
+ }
+
+ private int getDescriptor(int i) {
+ //assert Unsafe.getUnsafe().addressSize() == 8;
+ int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_OBJECT;
+ return (int) pollArray.getLong(offset);
+ }
}