src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java
changeset 49417 1d3139252c1c
parent 49248 15a0e60c8b97
child 49493 814bd31f8da0
--- a/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java	Sat Mar 24 14:43:04 2018 +0900
+++ b/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java	Sat Mar 24 08:49:55 2018 +0000
@@ -26,11 +26,31 @@
 package sun.nio.ch;
 
 import java.io.IOException;
-import java.nio.channels.*;
-import java.nio.channels.spi.*;
-import java.util.Map;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import jdk.internal.misc.Unsafe;
+
+import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
+import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
+import static sun.nio.ch.SolarisEventPort.SIZEOF_PORT_EVENT;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_EVENTS;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_SOURCE;
+import static sun.nio.ch.SolarisEventPort.OFFSETOF_OBJECT;
+import static sun.nio.ch.SolarisEventPort.port_create;
+import static sun.nio.ch.SolarisEventPort.port_close;
+import static sun.nio.ch.SolarisEventPort.port_associate;
+import static sun.nio.ch.SolarisEventPort.port_dissociate;
+import static sun.nio.ch.SolarisEventPort.port_getn;
+import static sun.nio.ch.SolarisEventPort.port_send;
 
 /**
  * Selector implementation based on the Solaris event port mechanism.
@@ -39,89 +59,206 @@
 class EventPortSelectorImpl
     extends SelectorImpl
 {
-    private final EventPortWrapper pollWrapper;
+    // maximum number of events to retrive in one call to port_getn
+    static final int MAX_EVENTS = Math.min(IOUtil.fdLimit()-1, 1024);
 
-    // Maps from file descriptors to keys
-    private final Map<Integer, SelectionKeyImpl> fdToKey;
+    // port file descriptor
+    private final int pfd;
+
+    // the poll array (populated by port_getn)
+    private final long pollArrayAddress;
+    private final AllocatedNativeObject pollArray;
 
-    // True if this Selector has been closed
-    private boolean closed;
+    // a registration of a file descriptor with a selector
+    private static class RegEntry {
+        final SelectionKeyImpl ski;
+        int registeredOps;
+        int lastUpdate;
+        RegEntry(SelectionKeyImpl ski) {
+            this.ski = ski;
+        }
+    }
 
-    // Lock for interrupt triggering and clearing
+    // maps a file descriptor to registration entry, synchronize on selector
+    private final Map<Integer, RegEntry> fdToRegEntry = new HashMap<>();
+
+    // the last update operation, incremented by processUpdateQueue
+    private int lastUpdate;
+
+    // pending new registrations/updates, queued by implRegister, putEventOps,
+    // and updateSelectedKeys
+    private final Object updateLock = new Object();
+    private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
+    private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
+    private final Deque<Integer> updateOps = new ArrayDeque<>();
+
+    // interrupt triggering and clearing
     private final Object interruptLock = new Object();
     private boolean interruptTriggered;
 
-    /**
-     * Package private constructor called by factory method in
-     * the abstract superclass Selector.
-     */
     EventPortSelectorImpl(SelectorProvider sp) throws IOException {
         super(sp);
-        pollWrapper = new EventPortWrapper();
-        fdToKey = new HashMap<>();
+
+        this.pfd = port_create();
+
+        int allocationSize = MAX_EVENTS * SIZEOF_PORT_EVENT;
+        this.pollArray = new AllocatedNativeObject(allocationSize, false);
+        this.pollArrayAddress = pollArray.address();
     }
 
     private void ensureOpen() {
-        if (closed)
+        if (!isOpen())
             throw new ClosedSelectorException();
     }
 
     @Override
     protected int doSelect(long timeout) throws IOException {
-        ensureOpen();
+        assert Thread.holdsLock(this);
+
+        int numEvents;
+        processUpdateQueue();
         processDeregisterQueue();
-        int entries;
         try {
             begin();
-            entries = pollWrapper.poll(timeout);
+
+            long to = timeout;
+            boolean timedPoll = (to > 0);
+            do {
+                long startTime = timedPoll ? System.nanoTime() : 0;
+                numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to);
+                if (numEvents == IOStatus.INTERRUPTED && timedPoll) {
+                    // timed poll interrupted so need to adjust timeout
+                    long adjust = System.nanoTime() - startTime;
+                    to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
+                    if (to <= 0) {
+                        // timeout also expired so no retry
+                        numEvents = 0;
+                    }
+                }
+            } while (numEvents == IOStatus.INTERRUPTED);
+            assert IOStatus.check(numEvents);
+
         } finally {
             end();
         }
         processDeregisterQueue();
-        int numKeysUpdated = updateSelectedKeys(entries);
-        if (pollWrapper.interrupted()) {
-            synchronized (interruptLock) {
-                interruptTriggered = false;
+        return processPortEvents(numEvents);
+    }
+
+    /**
+     * Process new registrations and changes to the interest ops.
+     */
+    private void processUpdateQueue() throws IOException {
+        assert Thread.holdsLock(this);
+
+        // bump lastUpdate to ensure that the interest ops are changed at most
+        // once per bulk update
+        lastUpdate++;
+
+        synchronized (updateLock) {
+            SelectionKeyImpl ski;
+
+            // new registrations
+            while ((ski = newKeys.pollFirst()) != null) {
+                if (ski.isValid()) {
+                    SelChImpl ch = ski.channel;
+                    int fd = ch.getFDVal();
+                    RegEntry previous = fdToRegEntry.put(fd, new RegEntry(ski));
+                    assert previous == null;
+                }
+            }
+
+            // changes to interest ops
+            assert updateKeys.size() == updateOps.size();
+            while ((ski = updateKeys.pollFirst()) != null) {
+                int ops = updateOps.pollFirst();
+                int fd = ski.channel.getFDVal();
+                RegEntry e = fdToRegEntry.get(fd);
+                if (ski.isValid() && (e != null) && (e.lastUpdate != lastUpdate)) {
+                    assert e.ski == ski;
+                    if ((ops != e.registeredOps)) {
+                        if (ops == 0) {
+                            port_dissociate(pfd, PORT_SOURCE_FD, fd);
+                        } else {
+                            port_associate(pfd, PORT_SOURCE_FD, fd, ops);
+                        }
+                        e.registeredOps = ops;
+                    }
+                    e.lastUpdate = lastUpdate;
+                }
             }
         }
-        return numKeysUpdated;
     }
 
-    private int updateSelectedKeys(int entries) {
+    /**
+     * Process the port events. This method updates the keys of file descriptors
+     * that were polled. It also re-queues the key so that the file descriptor
+     * is re-associated at the next select operation.
+     *
+     * @return the number of selection keys updated.
+     */
+    private int processPortEvents(int numEvents) throws IOException {
+        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(nioSelectedKeys());
+
         int numKeysUpdated = 0;
-        for (int i=0; i<entries; i++) {
-            int nextFD = pollWrapper.getDescriptor(i);
-            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
-            if (ski != null) {
-                int rOps = pollWrapper.getEventOps(i);
-                if (selectedKeys.contains(ski)) {
-                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
-                        numKeysUpdated++;
+        boolean interrupted = false;
+
+        synchronized (updateLock) {
+            for (int i = 0; i < numEvents; i++) {
+                short source = getSource(i);
+                if (source == PORT_SOURCE_FD) {
+                    int fd = getDescriptor(i);
+                    RegEntry e = fdToRegEntry.get(fd);
+                    if (e != null) {
+                        SelectionKeyImpl ski = e.ski;
+                        int rOps = getEventOps(i);
+                        if (selectedKeys.contains(ski)) {
+                            if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
+                                numKeysUpdated++;
+                            }
+                        } else {
+                            ski.channel.translateAndSetReadyOps(rOps, ski);
+                            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
+                                selectedKeys.add(ski);
+                                numKeysUpdated++;
+                            }
+                        }
+
+                        // queue selection key so that it is re-associated at
+                        // next select. Push to end of deque so that changes to
+                        // the interest ops are processed first
+                        updateKeys.addLast(ski);
+                        updateOps.addLast(e.registeredOps);
+                        e.registeredOps = 0;
                     }
+                } else if (source == PORT_SOURCE_USER) {
+                    interrupted = true;
                 } else {
-                    ski.channel.translateAndSetReadyOps(rOps, ski);
-                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                        selectedKeys.add(ski);
-                        numKeysUpdated++;
-                    }
+                    assert false;
                 }
             }
         }
+
+        if (interrupted) {
+            clearInterrupt();
+        }
         return numKeysUpdated;
     }
 
     @Override
     protected void implClose() throws IOException {
-        if (closed)
-            return;
-        closed = true;
+        assert !isOpen();
+        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(nioKeys());
 
         // prevent further wakeup
         synchronized (interruptLock) {
             interruptTriggered = true;
         }
 
-        pollWrapper.close();
+        port_close(pfd);
+        pollArray.free();
 
         // Deregister channels
         Iterator<SelectionKey> i = keys.iterator();
@@ -137,42 +274,83 @@
 
     @Override
     protected void implRegister(SelectionKeyImpl ski) {
-        int fd = IOUtil.fdVal(ski.channel.getFD());
-        fdToKey.put(Integer.valueOf(fd), ski);
+        assert Thread.holdsLock(nioKeys());
+        ensureOpen();
+        synchronized (updateLock) {
+            newKeys.addLast(ski);
+        }
         keys.add(ski);
     }
 
     @Override
     protected void implDereg(SelectionKeyImpl ski) throws IOException {
-        int i = ski.getIndex();
-        assert (i >= 0);
+        assert !ski.isValid();
+        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(nioKeys());
+        assert Thread.holdsLock(nioSelectedKeys());
+
         int fd = ski.channel.getFDVal();
-        fdToKey.remove(Integer.valueOf(fd));
-        pollWrapper.release(fd);
-        ski.setIndex(-1);
+        RegEntry e = fdToRegEntry.remove(fd);
+        if (e != null && e.registeredOps != 0) {
+            port_dissociate(pfd, PORT_SOURCE_FD, fd);
+        }
+
+        selectedKeys.remove(ski);
         keys.remove(ski);
-        selectedKeys.remove(ski);
-        deregister((AbstractSelectionKey)ski);
+
+        // remove from channel's key set
+        deregister(ski);
+
         SelectableChannel selch = ski.channel();
         if (!selch.isOpen() && !selch.isRegistered())
-            ((SelChImpl)selch).kill();
+            ((SelChImpl) selch).kill();
     }
 
     @Override
-    public void putEventOps(SelectionKeyImpl sk, int ops) {
+    public void putEventOps(SelectionKeyImpl ski, int ops) {
         ensureOpen();
-        int fd = sk.channel.getFDVal();
-        pollWrapper.setInterest(fd, ops);
+        synchronized (updateLock) {
+            // push to front of deque so that it processed before other
+            // updates for the same key.
+            updateOps.addFirst(ops);
+            updateKeys.addFirst(ski);
+        }
     }
 
     @Override
     public Selector wakeup() {
         synchronized (interruptLock) {
             if (!interruptTriggered) {
-                pollWrapper.interrupt();
+                try {
+                    port_send(pfd, 0);
+                } catch (IOException ioe) {
+                    throw new InternalError(ioe);
+                }
                 interruptTriggered = true;
             }
         }
         return this;
     }
+
+    private void clearInterrupt() throws IOException {
+        synchronized (interruptLock) {
+            interruptTriggered = false;
+        }
+    }
+
+    private short getSource(int i) {
+        int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_SOURCE;
+        return pollArray.getShort(offset);
+    }
+
+    private int getEventOps(int i) {
+        int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_EVENTS;
+        return pollArray.getInt(offset);
+    }
+
+    private int getDescriptor(int i) {
+        //assert Unsafe.getUnsafe().addressSize() == 8;
+        int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_OBJECT;
+        return (int) pollArray.getLong(offset);
+    }
 }