src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
changeset 49493 814bd31f8da0
parent 49290 07779973cbe2
child 49526 cad4c844902a
equal deleted inserted replaced
49492:f1a8ec1a6972 49493:814bd31f8da0
    25 
    25 
    26 package sun.nio.ch;
    26 package sun.nio.ch;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.channels.ClosedSelectorException;
    29 import java.nio.channels.ClosedSelectorException;
    30 import java.nio.channels.SelectableChannel;
       
    31 import java.nio.channels.SelectionKey;
       
    32 import java.nio.channels.Selector;
    30 import java.nio.channels.Selector;
    33 import java.nio.channels.spi.SelectorProvider;
    31 import java.nio.channels.spi.SelectorProvider;
    34 import java.util.ArrayDeque;
    32 import java.util.ArrayDeque;
    35 import java.util.BitSet;
       
    36 import java.util.Deque;
    33 import java.util.Deque;
    37 import java.util.HashMap;
    34 import java.util.HashMap;
    38 import java.util.Iterator;
       
    39 import java.util.Map;
    35 import java.util.Map;
    40 import java.util.concurrent.TimeUnit;
    36 import java.util.concurrent.TimeUnit;
    41 
    37 
    42 import static sun.nio.ch.KQueue.EVFILT_READ;
    38 import static sun.nio.ch.KQueue.EVFILT_READ;
    43 import static sun.nio.ch.KQueue.EVFILT_WRITE;
    39 import static sun.nio.ch.KQueue.EVFILT_WRITE;
    64     private final int fd1;
    60     private final int fd1;
    65 
    61 
    66     // maps file descriptor to selection key, synchronize on selector
    62     // maps file descriptor to selection key, synchronize on selector
    67     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    63     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    68 
    64 
    69     // file descriptors registered with kqueue, synchronize on selector
       
    70     private final BitSet registeredReadFilter = new BitSet();
       
    71     private final BitSet registeredWriteFilter = new BitSet();
       
    72 
       
    73     // pending new registrations/updates, queued by implRegister and putEventOps
    65     // pending new registrations/updates, queued by implRegister and putEventOps
    74     private final Object updateLock = new Object();
    66     private final Object updateLock = new Object();
    75     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
    67     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
    76     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    68     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    77     private final Deque<Integer> updateOps = new ArrayDeque<>();
    69     private final Deque<Integer> updateEvents = new ArrayDeque<>();
    78 
    70 
    79     // interrupt triggering and clearing
    71     // interrupt triggering and clearing
    80     private final Object interruptLock = new Object();
    72     private final Object interruptLock = new Object();
    81     private boolean interruptTriggered;
    73     private boolean interruptTriggered;
    82 
    74 
   111 
   103 
   112     @Override
   104     @Override
   113     protected int doSelect(long timeout) throws IOException {
   105     protected int doSelect(long timeout) throws IOException {
   114         assert Thread.holdsLock(this);
   106         assert Thread.holdsLock(this);
   115 
   107 
       
   108         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
       
   109         boolean blocking = (to != 0);
       
   110         boolean timedPoll = (to > 0);
       
   111 
   116         int numEntries;
   112         int numEntries;
   117         processUpdateQueue();
   113         processUpdateQueue();
   118         processDeregisterQueue();
   114         processDeregisterQueue();
   119         try {
   115         try {
   120             begin();
   116             begin(blocking);
   121 
   117 
   122             long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
       
   123             boolean timedPoll = (to > 0);
       
   124             do {
   118             do {
   125                 long startTime = timedPoll ? System.nanoTime() : 0;
   119                 long startTime = timedPoll ? System.nanoTime() : 0;
   126                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
   120                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
   127                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
   121                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
   128                     // timed poll interrupted so need to adjust timeout
   122                     // timed poll interrupted so need to adjust timeout
   135                 }
   129                 }
   136             } while (numEntries == IOStatus.INTERRUPTED);
   130             } while (numEntries == IOStatus.INTERRUPTED);
   137             assert IOStatus.check(numEntries);
   131             assert IOStatus.check(numEntries);
   138 
   132 
   139         } finally {
   133         } finally {
   140             end();
   134             end(blocking);
   141         }
   135         }
   142         processDeregisterQueue();
   136         processDeregisterQueue();
   143         return updateSelectedKeys(numEntries);
   137         return updateSelectedKeys(numEntries);
   144     }
   138     }
   145 
   139 
   153             SelectionKeyImpl ski;
   147             SelectionKeyImpl ski;
   154 
   148 
   155             // new registrations
   149             // new registrations
   156             while ((ski = newKeys.pollFirst()) != null) {
   150             while ((ski = newKeys.pollFirst()) != null) {
   157                 if (ski.isValid()) {
   151                 if (ski.isValid()) {
   158                     SelChImpl ch = ski.channel;
   152                     int fd = ski.channel.getFDVal();
   159                     int fd = ch.getFDVal();
       
   160                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
   153                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
   161                     assert previous == null;
   154                     assert previous == null;
   162                     assert registeredReadFilter.get(fd) == false;
   155                     assert ski.registeredEvents() == 0;
   163                     assert registeredWriteFilter.get(fd) == false;
       
   164                 }
   156                 }
   165             }
   157             }
   166 
   158 
   167             // changes to interest ops
   159             // changes to interest ops
   168             assert updateKeys.size() == updateOps.size();
   160             assert updateKeys.size() == updateKeys.size();
   169             while ((ski = updateKeys.pollFirst()) != null) {
   161             while ((ski = updateKeys.pollFirst()) != null) {
   170                 int ops = updateOps.pollFirst();
   162                 int newEvents = updateEvents.pollFirst();
   171                 int fd = ski.channel.getFDVal();
   163                 int fd = ski.channel.getFDVal();
   172                 if (ski.isValid() && fdToKey.containsKey(fd)) {
   164                 if (ski.isValid() && fdToKey.containsKey(fd)) {
   173                     // add or delete interest in read events
   165                     int registeredEvents = ski.registeredEvents();
   174                     if (registeredReadFilter.get(fd)) {
   166                     if (newEvents != registeredEvents) {
   175                         if ((ops & Net.POLLIN) == 0) {
   167 
   176                             KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
   168                         // add or delete interest in read events
   177                             registeredReadFilter.clear(fd);
   169                         if ((registeredEvents & Net.POLLIN) != 0) {
       
   170                             if ((newEvents & Net.POLLIN) == 0) {
       
   171                                 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
       
   172                             }
       
   173                         } else if ((newEvents & Net.POLLIN) != 0) {
       
   174                             KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
   178                         }
   175                         }
   179                     } else if ((ops & Net.POLLIN) != 0) {
   176 
   180                         KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
   177                         // add or delete interest in write events
   181                         registeredReadFilter.set(fd);
   178                         if ((registeredEvents & Net.POLLOUT) != 0) {
       
   179                             if ((newEvents & Net.POLLOUT) == 0) {
       
   180                                 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
       
   181                             }
       
   182                         } else if ((newEvents & Net.POLLOUT) != 0) {
       
   183                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
       
   184                         }
       
   185 
       
   186                         ski.registeredEvents(newEvents);
   182                     }
   187                     }
   183 
       
   184                     // add or delete interest in write events
       
   185                     if (registeredWriteFilter.get(fd)) {
       
   186                         if ((ops & Net.POLLOUT) == 0) {
       
   187                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
       
   188                             registeredWriteFilter.clear(fd);
       
   189                         }
       
   190                     } else if ((ops & Net.POLLOUT) != 0) {
       
   191                         KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
       
   192                         registeredWriteFilter.set(fd);
       
   193                     }
       
   194                 }
   188                 }
   195             }
   189             }
   196         }
   190         }
   197     }
   191     }
   198 
   192 
   199     /**
   193     /**
   200      * Update the keys whose fd's have been selected by kqueue.
   194      * Update the keys of file descriptors that were polled and add them to
   201      * Add the ready keys to the selected key set.
   195      * the selected-key set.
   202      * If the interrupt fd has been selected, drain it and clear the interrupt.
   196      * If the interrupt fd has been selected, drain it and clear the interrupt.
   203      */
   197      */
   204     private int updateSelectedKeys(int numEntries) throws IOException {
   198     private int updateSelectedKeys(int numEntries) throws IOException {
   205         assert Thread.holdsLock(this);
   199         assert Thread.holdsLock(this);
   206         assert Thread.holdsLock(nioSelectedKeys());
   200         assert Thread.holdsLock(nioSelectedKeys());
   263 
   257 
   264     @Override
   258     @Override
   265     protected void implClose() throws IOException {
   259     protected void implClose() throws IOException {
   266         assert !isOpen();
   260         assert !isOpen();
   267         assert Thread.holdsLock(this);
   261         assert Thread.holdsLock(this);
   268         assert Thread.holdsLock(nioKeys());
       
   269 
   262 
   270         // prevent further wakeup
   263         // prevent further wakeup
   271         synchronized (interruptLock) {
   264         synchronized (interruptLock) {
   272             interruptTriggered = true;
   265             interruptTriggered = true;
   273         }
   266         }
   275         FileDispatcherImpl.closeIntFD(kqfd);
   268         FileDispatcherImpl.closeIntFD(kqfd);
   276         KQueue.freePollArray(pollArrayAddress);
   269         KQueue.freePollArray(pollArrayAddress);
   277 
   270 
   278         FileDispatcherImpl.closeIntFD(fd0);
   271         FileDispatcherImpl.closeIntFD(fd0);
   279         FileDispatcherImpl.closeIntFD(fd1);
   272         FileDispatcherImpl.closeIntFD(fd1);
   280 
       
   281         // Deregister channels
       
   282         Iterator<SelectionKey> i = keys.iterator();
       
   283         while (i.hasNext()) {
       
   284             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
       
   285             deregister(ski);
       
   286             SelectableChannel selch = ski.channel();
       
   287             if (!selch.isOpen() && !selch.isRegistered())
       
   288                 ((SelChImpl)selch).kill();
       
   289             i.remove();
       
   290         }
       
   291     }
   273     }
   292 
   274 
   293     @Override
   275     @Override
   294     protected void implRegister(SelectionKeyImpl ski) {
   276     protected void implRegister(SelectionKeyImpl ski) {
   295         assert Thread.holdsLock(nioKeys());
       
   296         ensureOpen();
   277         ensureOpen();
   297         synchronized (updateLock) {
   278         synchronized (updateLock) {
   298             newKeys.addLast(ski);
   279             newKeys.addLast(ski);
   299         }
   280         }
   300         keys.add(ski);
       
   301     }
   281     }
   302 
   282 
   303     @Override
   283     @Override
   304     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   284     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   305         assert !ski.isValid();
   285         assert !ski.isValid();
   306         assert Thread.holdsLock(this);
   286         assert Thread.holdsLock(this);
   307         assert Thread.holdsLock(nioKeys());
       
   308         assert Thread.holdsLock(nioSelectedKeys());
       
   309 
   287 
   310         int fd = ski.channel.getFDVal();
   288         int fd = ski.channel.getFDVal();
   311         fdToKey.remove(fd);
   289         int registeredEvents = ski.registeredEvents();
   312         if (registeredReadFilter.get(fd)) {
   290         if (fdToKey.remove(fd) != null) {
   313             KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
   291             if (registeredEvents != 0) {
   314             registeredReadFilter.clear(fd);
   292                 if ((registeredEvents & Net.POLLIN) != 0)
   315         }
   293                     KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
   316         if (registeredWriteFilter.get(fd)) {
   294                 if ((registeredEvents & Net.POLLOUT) != 0)
   317             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
   295                     KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
   318             registeredWriteFilter.clear(fd);
   296                 ski.registeredEvents(0);
   319         }
   297             }
   320 
   298         } else {
   321         selectedKeys.remove(ski);
   299             assert registeredEvents == 0;
   322         keys.remove(ski);
   300         }
   323 
   301     }
   324         // remove from channel's key set
   302 
   325         deregister(ski);
   303     @Override
   326 
   304     public void putEventOps(SelectionKeyImpl ski, int events) {
   327         SelectableChannel selch = ski.channel();
       
   328         if (!selch.isOpen() && !selch.isRegistered())
       
   329             ((SelChImpl) selch).kill();
       
   330     }
       
   331 
       
   332     @Override
       
   333     public void putEventOps(SelectionKeyImpl ski, int ops) {
       
   334         ensureOpen();
   305         ensureOpen();
   335         synchronized (updateLock) {
   306         synchronized (updateLock) {
   336             updateOps.addLast(ops);   // ops first in case adding the key fails
   307             updateEvents.addLast(events);  // events first in case adding key fails
   337             updateKeys.addLast(ski);
   308             updateKeys.addLast(ski);
   338         }
   309         }
   339     }
   310     }
   340 
   311 
   341     @Override
   312     @Override