src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
changeset 49290 07779973cbe2
parent 49248 15a0e60c8b97
child 49493 814bd31f8da0
equal deleted inserted replaced
49289:148e29df1644 49290:07779973cbe2
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 /*
       
    27  * KQueueSelectorImpl.java
       
    28  * Implementation of Selector using FreeBSD / Mac OS X kqueues
       
    29  */
       
    30 
       
    31 package sun.nio.ch;
    26 package sun.nio.ch;
    32 
    27 
    33 import java.io.IOException;
    28 import java.io.IOException;
    34 import java.nio.channels.ClosedSelectorException;
    29 import java.nio.channels.ClosedSelectorException;
    35 import java.nio.channels.SelectableChannel;
    30 import java.nio.channels.SelectableChannel;
    36 import java.nio.channels.SelectionKey;
    31 import java.nio.channels.SelectionKey;
    37 import java.nio.channels.Selector;
    32 import java.nio.channels.Selector;
    38 import java.nio.channels.spi.SelectorProvider;
    33 import java.nio.channels.spi.SelectorProvider;
       
    34 import java.util.ArrayDeque;
       
    35 import java.util.BitSet;
       
    36 import java.util.Deque;
    39 import java.util.HashMap;
    37 import java.util.HashMap;
    40 import java.util.Iterator;
    38 import java.util.Iterator;
    41 
    39 import java.util.Map;
    42 class KQueueSelectorImpl
    40 import java.util.concurrent.TimeUnit;
    43     extends SelectorImpl
    41 
    44 {
    42 import static sun.nio.ch.KQueue.EVFILT_READ;
    45     // File descriptors used for interrupt
    43 import static sun.nio.ch.KQueue.EVFILT_WRITE;
       
    44 import static sun.nio.ch.KQueue.EV_ADD;
       
    45 import static sun.nio.ch.KQueue.EV_DELETE;
       
    46 
       
    47 /**
       
    48  * KQueue based Selector implementation for macOS
       
    49  */
       
    50 
       
    51 class KQueueSelectorImpl extends SelectorImpl {
       
    52 
       
    53     // maximum number of events to poll in one call to kqueue
       
    54     private static final int MAX_KEVENTS = 256;
       
    55 
       
    56     // kqueue file descriptor
       
    57     private final int kqfd;
       
    58 
       
    59     // address of poll array (event list) when polling for pending events
       
    60     private final long pollArrayAddress;
       
    61 
       
    62     // file descriptors used for interrupt
    46     private final int fd0;
    63     private final int fd0;
    47     private final int fd1;
    64     private final int fd1;
    48 
    65 
    49     // The kqueue manipulator
    66     // maps file descriptor to selection key, synchronize on selector
    50     private final KQueueArrayWrapper kqueueWrapper;
    67     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    51 
    68 
    52     // Map from a file descriptor to an entry containing the selection key
    69     // file descriptors registered with kqueue, synchronize on selector
    53     private final HashMap<Integer, MapEntry> fdMap;
    70     private final BitSet registeredReadFilter = new BitSet();
    54 
    71     private final BitSet registeredWriteFilter = new BitSet();
    55     // True if this Selector has been closed
    72 
    56     private boolean closed;
    73     // pending new registrations/updates, queued by implRegister and putEventOps
    57 
    74     private final Object updateLock = new Object();
    58     // Lock for interrupt triggering and clearing
    75     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
       
    76     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
       
    77     private final Deque<Integer> updateOps = new ArrayDeque<>();
       
    78 
       
    79     // interrupt triggering and clearing
    59     private final Object interruptLock = new Object();
    80     private final Object interruptLock = new Object();
    60     private boolean interruptTriggered;
    81     private boolean interruptTriggered;
    61 
    82 
    62     // used by updateSelectedKeys to handle cases where the same file
    83     // used by updateSelectedKeys to handle cases where the same file
    63     // descriptor is polled by more than one filter
    84     // descriptor is polled by more than one filter
    64     private long updateCount;
    85     private int pollCount;
    65 
    86 
    66     // Used to map file descriptors to a selection key and "update count"
       
    67     // (see updateSelectedKeys for usage).
       
    68     private static class MapEntry {
       
    69         SelectionKeyImpl ski;
       
    70         long updateCount;
       
    71         MapEntry(SelectionKeyImpl ski) {
       
    72             this.ski = ski;
       
    73         }
       
    74     }
       
    75 
       
    76     /**
       
    77      * Package private constructor called by factory method in
       
    78      * the abstract superclass Selector.
       
    79      */
       
    80     KQueueSelectorImpl(SelectorProvider sp) throws IOException {
    87     KQueueSelectorImpl(SelectorProvider sp) throws IOException {
    81         super(sp);
    88         super(sp);
    82         long fds = IOUtil.makePipe(false);
    89 
    83         fd0 = (int)(fds >>> 32);
    90         this.kqfd = KQueue.create();
    84         fd1 = (int)fds;
    91         this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS);
       
    92 
    85         try {
    93         try {
    86             kqueueWrapper = new KQueueArrayWrapper(fd0, fd1);
    94             long fds = IOUtil.makePipe(false);
    87             fdMap = new HashMap<>();
    95             this.fd0 = (int) (fds >>> 32);
    88         } catch (Throwable t) {
    96             this.fd1 = (int) fds;
    89             try {
    97         } catch (IOException ioe) {
    90                 FileDispatcherImpl.closeIntFD(fd0);
    98             KQueue.freePollArray(pollArrayAddress);
    91             } catch (IOException ioe0) {
    99             FileDispatcherImpl.closeIntFD(kqfd);
    92                 t.addSuppressed(ioe0);
   100             throw ioe;
    93             }
   101         }
    94             try {
   102 
    95                 FileDispatcherImpl.closeIntFD(fd1);
   103         // register one end of the socket pair for wakeups
    96             } catch (IOException ioe1) {
   104         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
    97                 t.addSuppressed(ioe1);
       
    98             }
       
    99             throw t;
       
   100         }
       
   101     }
   105     }
   102 
   106 
   103     private void ensureOpen() {
   107     private void ensureOpen() {
   104         if (closed)
   108         if (!isOpen())
   105             throw new ClosedSelectorException();
   109             throw new ClosedSelectorException();
   106     }
   110     }
   107 
   111 
   108     @Override
   112     @Override
   109     protected int doSelect(long timeout)
   113     protected int doSelect(long timeout) throws IOException {
   110         throws IOException
   114         assert Thread.holdsLock(this);
   111     {
   115 
   112         ensureOpen();
       
   113         int numEntries;
   116         int numEntries;
       
   117         processUpdateQueue();
   114         processDeregisterQueue();
   118         processDeregisterQueue();
   115         try {
   119         try {
   116             begin();
   120             begin();
   117             numEntries = kqueueWrapper.poll(timeout);
   121 
       
   122             long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
       
   123             boolean timedPoll = (to > 0);
       
   124             do {
       
   125                 long startTime = timedPoll ? System.nanoTime() : 0;
       
   126                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
       
   127                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
       
   128                     // timed poll interrupted so need to adjust timeout
       
   129                     long adjust = System.nanoTime() - startTime;
       
   130                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
       
   131                     if (to <= 0) {
       
   132                         // timeout expired so no retry
       
   133                         numEntries = 0;
       
   134                     }
       
   135                 }
       
   136             } while (numEntries == IOStatus.INTERRUPTED);
       
   137             assert IOStatus.check(numEntries);
       
   138 
   118         } finally {
   139         } finally {
   119             end();
   140             end();
   120         }
   141         }
   121         processDeregisterQueue();
   142         processDeregisterQueue();
   122         return updateSelectedKeys(numEntries);
   143         return updateSelectedKeys(numEntries);
       
   144     }
       
   145 
       
   146     /**
       
   147      * Process new registrations and changes to the interest ops.
       
   148      */
       
   149     private void processUpdateQueue() {
       
   150         assert Thread.holdsLock(this);
       
   151 
       
   152         synchronized (updateLock) {
       
   153             SelectionKeyImpl ski;
       
   154 
       
   155             // new registrations
       
   156             while ((ski = newKeys.pollFirst()) != null) {
       
   157                 if (ski.isValid()) {
       
   158                     SelChImpl ch = ski.channel;
       
   159                     int fd = ch.getFDVal();
       
   160                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
       
   161                     assert previous == null;
       
   162                     assert registeredReadFilter.get(fd) == false;
       
   163                     assert registeredWriteFilter.get(fd) == false;
       
   164                 }
       
   165             }
       
   166 
       
   167             // changes to interest ops
       
   168             assert updateKeys.size() == updateOps.size();
       
   169             while ((ski = updateKeys.pollFirst()) != null) {
       
   170                 int ops = updateOps.pollFirst();
       
   171                 int fd = ski.channel.getFDVal();
       
   172                 if (ski.isValid() && fdToKey.containsKey(fd)) {
       
   173                     // add or delete interest in read events
       
   174                     if (registeredReadFilter.get(fd)) {
       
   175                         if ((ops & Net.POLLIN) == 0) {
       
   176                             KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
       
   177                             registeredReadFilter.clear(fd);
       
   178                         }
       
   179                     } else if ((ops & Net.POLLIN) != 0) {
       
   180                         KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
       
   181                         registeredReadFilter.set(fd);
       
   182                     }
       
   183 
       
   184                     // add or delete interest in write events
       
   185                     if (registeredWriteFilter.get(fd)) {
       
   186                         if ((ops & Net.POLLOUT) == 0) {
       
   187                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
       
   188                             registeredWriteFilter.clear(fd);
       
   189                         }
       
   190                     } else if ((ops & Net.POLLOUT) != 0) {
       
   191                         KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
       
   192                         registeredWriteFilter.set(fd);
       
   193                     }
       
   194                 }
       
   195             }
       
   196         }
   123     }
   197     }
   124 
   198 
   125     /**
   199     /**
   126      * Update the keys whose fd's have been selected by kqueue.
   200      * Update the keys whose fd's have been selected by kqueue.
   127      * Add the ready keys to the selected key set.
   201      * Add the ready keys to the selected key set.
   128      * If the interrupt fd has been selected, drain it and clear the interrupt.
   202      * If the interrupt fd has been selected, drain it and clear the interrupt.
   129      */
   203      */
   130     private int updateSelectedKeys(int numEntries)
   204     private int updateSelectedKeys(int numEntries) throws IOException {
   131         throws IOException
   205         assert Thread.holdsLock(this);
   132     {
   206         assert Thread.holdsLock(nioSelectedKeys());
       
   207 
   133         int numKeysUpdated = 0;
   208         int numKeysUpdated = 0;
   134         boolean interrupted = false;
   209         boolean interrupted = false;
   135 
   210 
   136         // A file descriptor may be registered with kqueue with more than one
   211         // A file descriptor may be registered with kqueue with more than one
   137         // filter and so there may be more than one event for a fd. The update
   212         // filter and so there may be more than one event for a fd. The poll
   138         // count in the MapEntry tracks when the fd was last updated and this
   213         // count is incremented here and compared against the SelectionKey's
   139         // ensures that the ready ops are updated rather than replaced by a
   214         // "lastPolled" field. This ensures that the ready ops is updated rather
   140         // second or subsequent event.
   215         // than replaced when a file descriptor is polled by both the read and
   141         updateCount++;
   216         // write filter.
       
   217         pollCount++;
   142 
   218 
   143         for (int i = 0; i < numEntries; i++) {
   219         for (int i = 0; i < numEntries; i++) {
   144             int nextFD = kqueueWrapper.getDescriptor(i);
   220             long kevent = KQueue.getEvent(pollArrayAddress, i);
   145             if (nextFD == fd0) {
   221             int fd = KQueue.getDescriptor(kevent);
       
   222             if (fd == fd0) {
   146                 interrupted = true;
   223                 interrupted = true;
   147             } else {
   224             } else {
   148                 MapEntry me = fdMap.get(Integer.valueOf(nextFD));
   225                 SelectionKeyImpl ski = fdToKey.get(fd);
   149                 if (me != null) {
   226                 if (ski != null) {
   150                     int rOps = kqueueWrapper.getReventOps(i);
   227                     int rOps = 0;
   151                     SelectionKeyImpl ski = me.ski;
   228                     short filter = KQueue.getFilter(kevent);
       
   229                     if (filter == EVFILT_READ) {
       
   230                         rOps |= Net.POLLIN;
       
   231                     } else if (filter == EVFILT_WRITE) {
       
   232                         rOps |= Net.POLLOUT;
       
   233                     }
       
   234 
   152                     if (selectedKeys.contains(ski)) {
   235                     if (selectedKeys.contains(ski)) {
   153                         // first time this file descriptor has been encountered on this
   236                         // file descriptor may be polled more than once per poll
   154                         // update?
   237                         if (ski.lastPolled != pollCount) {
   155                         if (me.updateCount != updateCount) {
       
   156                             if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
   238                             if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
   157                                 numKeysUpdated++;
   239                                 numKeysUpdated++;
   158                                 me.updateCount = updateCount;
   240                                 ski.lastPolled = pollCount;
   159                             }
   241                             }
   160                         } else {
   242                         } else {
   161                             // ready ops have already been set on this update
   243                             // ready ops have already been set on this update
   162                             ski.channel.translateAndUpdateReadyOps(rOps, ski);
   244                             ski.channel.translateAndUpdateReadyOps(rOps, ski);
   163                         }
   245                         }
   164                     } else {
   246                     } else {
   165                         ski.channel.translateAndSetReadyOps(rOps, ski);
   247                         ski.channel.translateAndSetReadyOps(rOps, ski);
   166                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
   248                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
   167                             selectedKeys.add(ski);
   249                             selectedKeys.add(ski);
   168                             numKeysUpdated++;
   250                             numKeysUpdated++;
   169                             me.updateCount = updateCount;
   251                             ski.lastPolled = pollCount;
   170                         }
   252                         }
   171                     }
   253                     }
   172                 }
   254                 }
   173             }
   255             }
   174         }
   256         }
   179         return numKeysUpdated;
   261         return numKeysUpdated;
   180     }
   262     }
   181 
   263 
   182     @Override
   264     @Override
   183     protected void implClose() throws IOException {
   265     protected void implClose() throws IOException {
   184         if (!closed) {
   266         assert !isOpen();
   185             closed = true;
   267         assert Thread.holdsLock(this);
   186 
   268         assert Thread.holdsLock(nioKeys());
   187             // prevent further wakeup
   269 
   188             synchronized (interruptLock) {
   270         // prevent further wakeup
   189                 interruptTriggered = true;
   271         synchronized (interruptLock) {
   190             }
   272             interruptTriggered = true;
   191 
   273         }
   192             kqueueWrapper.close();
   274 
   193             FileDispatcherImpl.closeIntFD(fd0);
   275         FileDispatcherImpl.closeIntFD(kqfd);
   194             FileDispatcherImpl.closeIntFD(fd1);
   276         KQueue.freePollArray(pollArrayAddress);
   195 
   277 
   196             // Deregister channels
   278         FileDispatcherImpl.closeIntFD(fd0);
   197             Iterator<SelectionKey> i = keys.iterator();
   279         FileDispatcherImpl.closeIntFD(fd1);
   198             while (i.hasNext()) {
   280 
   199                 SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
   281         // Deregister channels
   200                 deregister(ski);
   282         Iterator<SelectionKey> i = keys.iterator();
   201                 SelectableChannel selch = ski.channel();
   283         while (i.hasNext()) {
   202                 if (!selch.isOpen() && !selch.isRegistered())
   284             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
   203                     ((SelChImpl)selch).kill();
   285             deregister(ski);
   204                 i.remove();
   286             SelectableChannel selch = ski.channel();
   205             }
   287             if (!selch.isOpen() && !selch.isRegistered())
       
   288                 ((SelChImpl)selch).kill();
       
   289             i.remove();
   206         }
   290         }
   207     }
   291     }
   208 
   292 
   209     @Override
   293     @Override
   210     protected void implRegister(SelectionKeyImpl ski) {
   294     protected void implRegister(SelectionKeyImpl ski) {
       
   295         assert Thread.holdsLock(nioKeys());
   211         ensureOpen();
   296         ensureOpen();
   212         int fd = IOUtil.fdVal(ski.channel.getFD());
   297         synchronized (updateLock) {
   213         fdMap.put(Integer.valueOf(fd), new MapEntry(ski));
   298             newKeys.addLast(ski);
       
   299         }
   214         keys.add(ski);
   300         keys.add(ski);
   215     }
   301     }
   216 
   302 
   217     @Override
   303     @Override
   218     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   304     protected void implDereg(SelectionKeyImpl ski) throws IOException {
       
   305         assert !ski.isValid();
       
   306         assert Thread.holdsLock(this);
       
   307         assert Thread.holdsLock(nioKeys());
       
   308         assert Thread.holdsLock(nioSelectedKeys());
       
   309 
   219         int fd = ski.channel.getFDVal();
   310         int fd = ski.channel.getFDVal();
   220         fdMap.remove(Integer.valueOf(fd));
   311         fdToKey.remove(fd);
   221         kqueueWrapper.release(ski.channel);
   312         if (registeredReadFilter.get(fd)) {
       
   313             KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
       
   314             registeredReadFilter.clear(fd);
       
   315         }
       
   316         if (registeredWriteFilter.get(fd)) {
       
   317             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
       
   318             registeredWriteFilter.clear(fd);
       
   319         }
       
   320 
       
   321         selectedKeys.remove(ski);
   222         keys.remove(ski);
   322         keys.remove(ski);
   223         selectedKeys.remove(ski);
   323 
       
   324         // remove from channel's key set
   224         deregister(ski);
   325         deregister(ski);
       
   326 
   225         SelectableChannel selch = ski.channel();
   327         SelectableChannel selch = ski.channel();
   226         if (!selch.isOpen() && !selch.isRegistered())
   328         if (!selch.isOpen() && !selch.isRegistered())
   227             ((SelChImpl)selch).kill();
   329             ((SelChImpl) selch).kill();
   228     }
   330     }
   229 
   331 
   230     @Override
   332     @Override
   231     public void putEventOps(SelectionKeyImpl ski, int ops) {
   333     public void putEventOps(SelectionKeyImpl ski, int ops) {
   232         ensureOpen();
   334         ensureOpen();
   233         kqueueWrapper.setInterest(ski.channel, ops);
   335         synchronized (updateLock) {
       
   336             updateOps.addLast(ops);   // ops first in case adding the key fails
       
   337             updateKeys.addLast(ski);
       
   338         }
   234     }
   339     }
   235 
   340 
   236     @Override
   341     @Override
   237     public Selector wakeup() {
   342     public Selector wakeup() {
   238         synchronized (interruptLock) {
   343         synchronized (interruptLock) {
   239             if (!interruptTriggered) {
   344             if (!interruptTriggered) {
   240                 kqueueWrapper.interrupt();
   345                 try {
       
   346                     IOUtil.write1(fd1, (byte)0);
       
   347                 } catch (IOException ioe) {
       
   348                     throw new InternalError(ioe);
       
   349                 }
   241                 interruptTriggered = true;
   350                 interruptTriggered = true;
   242             }
   351             }
   243         }
   352         }
   244         return this;
   353         return this;
   245     }
   354     }