src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
changeset 49493 814bd31f8da0
parent 49290 07779973cbe2
child 49526 cad4c844902a
equal deleted inserted replaced
49492:f1a8ec1a6972 49493:814bd31f8da0
    25 
    25 
    26 package sun.nio.ch;
    26 package sun.nio.ch;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.channels.ClosedSelectorException;
    29 import java.nio.channels.ClosedSelectorException;
    30 import java.nio.channels.SelectableChannel;
       
    31 import java.nio.channels.SelectionKey;
       
    32 import java.nio.channels.Selector;
    30 import java.nio.channels.Selector;
    33 import java.nio.channels.spi.SelectorProvider;
    31 import java.nio.channels.spi.SelectorProvider;
    34 import java.util.ArrayDeque;
    32 import java.util.ArrayDeque;
    35 import java.util.BitSet;
       
    36 import java.util.Deque;
    33 import java.util.Deque;
    37 import java.util.HashMap;
    34 import java.util.HashMap;
    38 import java.util.Iterator;
       
    39 import java.util.Map;
    35 import java.util.Map;
    40 import java.util.concurrent.TimeUnit;
    36 import java.util.concurrent.TimeUnit;
    41 
    37 
    42 import static sun.nio.ch.EPoll.EPOLLIN;
    38 import static sun.nio.ch.EPoll.EPOLLIN;
    43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
    39 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
    65     private final int fd1;
    61     private final int fd1;
    66 
    62 
    67     // maps file descriptor to selection key, synchronize on selector
    63     // maps file descriptor to selection key, synchronize on selector
    68     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    64     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    69 
    65 
    70     // file descriptors registered with epoll, synchronize on selector
    66     // pending new registrations/updates, queued by implRegister and putEventOpos
    71     private final BitSet registered = new BitSet();
       
    72 
       
    73     // pending new registrations/updates, queued by implRegister and putEventOps
       
    74     private final Object updateLock = new Object();
    67     private final Object updateLock = new Object();
    75     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
    68     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
    76     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    69     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    77     private final Deque<Integer> updateOps = new ArrayDeque<>();
    70     private final Deque<Integer> updateEvents = new ArrayDeque<>();
    78 
    71 
    79     // interrupt triggering and clearing
    72     // interrupt triggering and clearing
    80     private final Object interruptLock = new Object();
    73     private final Object interruptLock = new Object();
    81     private boolean interruptTriggered;
    74     private boolean interruptTriggered;
    82 
    75 
   111 
   104 
   112     @Override
   105     @Override
   113     protected int doSelect(long timeout) throws IOException {
   106     protected int doSelect(long timeout) throws IOException {
   114         assert Thread.holdsLock(this);
   107         assert Thread.holdsLock(this);
   115 
   108 
       
   109         // epoll_wait timeout is int
       
   110         int to = (int) Math.min(timeout, Integer.MAX_VALUE);
       
   111         boolean blocking = (to != 0);
       
   112         boolean timedPoll = (to > 0);
       
   113 
   116         int numEntries;
   114         int numEntries;
   117         processUpdateQueue();
   115         processUpdateQueue();
   118         processDeregisterQueue();
   116         processDeregisterQueue();
   119         try {
   117         try {
   120             begin();
   118             begin(blocking);
   121 
   119 
   122             // epoll_wait timeout is int
       
   123             int to = (int) Math.min(timeout, Integer.MAX_VALUE);
       
   124             boolean timedPoll = (to > 0);
       
   125             do {
   120             do {
   126                 long startTime = timedPoll ? System.nanoTime() : 0;
   121                 long startTime = timedPoll ? System.nanoTime() : 0;
   127                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
   122                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
   128                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
   123                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
   129                     // timed poll interrupted so need to adjust timeout
   124                     // timed poll interrupted so need to adjust timeout
   136                 }
   131                 }
   137             } while (numEntries == IOStatus.INTERRUPTED);
   132             } while (numEntries == IOStatus.INTERRUPTED);
   138             assert IOStatus.check(numEntries);
   133             assert IOStatus.check(numEntries);
   139 
   134 
   140         } finally {
   135         } finally {
   141             end();
   136             end(blocking);
   142         }
   137         }
   143         processDeregisterQueue();
   138         processDeregisterQueue();
   144         return updateSelectedKeys(numEntries);
   139         return updateSelectedKeys(numEntries);
   145     }
   140     }
   146 
   141 
   154             SelectionKeyImpl ski;
   149             SelectionKeyImpl ski;
   155 
   150 
   156             // new registrations
   151             // new registrations
   157             while ((ski = newKeys.pollFirst()) != null) {
   152             while ((ski = newKeys.pollFirst()) != null) {
   158                 if (ski.isValid()) {
   153                 if (ski.isValid()) {
   159                     SelChImpl ch = ski.channel;
   154                     int fd = ski.channel.getFDVal();
   160                     int fd = ch.getFDVal();
       
   161                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
   155                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
   162                     assert previous == null;
   156                     assert previous == null;
   163                     assert registered.get(fd) == false;
   157                     assert ski.registeredEvents() == 0;
   164                 }
   158                 }
   165             }
   159             }
   166 
   160 
   167             // changes to interest ops
   161             // changes to interest ops
   168             assert updateKeys.size() == updateOps.size();
   162             assert updateKeys.size() == updateEvents.size();
   169             while ((ski = updateKeys.pollFirst()) != null) {
   163             while ((ski = updateKeys.pollFirst()) != null) {
   170                 int ops = updateOps.pollFirst();
   164                 int newEvents = updateEvents.pollFirst();
   171                 int fd = ski.channel.getFDVal();
   165                 int fd = ski.channel.getFDVal();
   172                 if (ski.isValid() && fdToKey.containsKey(fd)) {
   166                 if (ski.isValid() && fdToKey.containsKey(fd)) {
   173                     if (registered.get(fd)) {
   167                     int registeredEvents = ski.registeredEvents();
   174                         if (ops == 0) {
   168                     if (newEvents != registeredEvents) {
       
   169                         if (newEvents == 0) {
   175                             // remove from epoll
   170                             // remove from epoll
   176                             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
   171                             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
   177                             registered.clear(fd);
       
   178                         } else {
   172                         } else {
   179                             // modify events
   173                             if (registeredEvents == 0) {
   180                             EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
   174                                 // add to epoll
       
   175                                 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
       
   176                             } else {
       
   177                                 // modify events
       
   178                                 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
       
   179                             }
   181                         }
   180                         }
   182                     } else if (ops != 0) {
   181                         ski.registeredEvents(newEvents);
   183                         // add to epoll
       
   184                         EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
       
   185                         registered.set(fd);
       
   186                     }
   182                     }
   187                 }
   183                 }
   188             }
   184             }
   189         }
   185         }
   190     }
   186     }
   191 
   187 
   192     /**
   188     /**
   193      * Update the keys whose fd's have been selected by the epoll.
   189      * Update the keys of file descriptors that were polled and add them to
   194      * Add the ready keys to the ready queue.
   190      * the selected-key set.
       
   191      * If the interrupt fd has been selected, drain it and clear the interrupt.
   195      */
   192      */
   196     private int updateSelectedKeys(int numEntries) throws IOException {
   193     private int updateSelectedKeys(int numEntries) throws IOException {
   197         assert Thread.holdsLock(this);
   194         assert Thread.holdsLock(this);
   198         assert Thread.holdsLock(nioSelectedKeys());
   195         assert Thread.holdsLock(nioSelectedKeys());
   199 
   196 
   231     }
   228     }
   232 
   229 
   233     @Override
   230     @Override
   234     protected void implClose() throws IOException {
   231     protected void implClose() throws IOException {
   235         assert Thread.holdsLock(this);
   232         assert Thread.holdsLock(this);
   236         assert Thread.holdsLock(nioKeys());
       
   237 
   233 
   238         // prevent further wakeup
   234         // prevent further wakeup
   239         synchronized (interruptLock) {
   235         synchronized (interruptLock) {
   240             interruptTriggered = true;
   236             interruptTriggered = true;
   241         }
   237         }
   243         FileDispatcherImpl.closeIntFD(epfd);
   239         FileDispatcherImpl.closeIntFD(epfd);
   244         EPoll.freePollArray(pollArrayAddress);
   240         EPoll.freePollArray(pollArrayAddress);
   245 
   241 
   246         FileDispatcherImpl.closeIntFD(fd0);
   242         FileDispatcherImpl.closeIntFD(fd0);
   247         FileDispatcherImpl.closeIntFD(fd1);
   243         FileDispatcherImpl.closeIntFD(fd1);
   248 
       
   249         // Deregister channels
       
   250         Iterator<SelectionKey> i = keys.iterator();
       
   251         while (i.hasNext()) {
       
   252             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
       
   253             deregister(ski);
       
   254             SelectableChannel selch = ski.channel();
       
   255             if (!selch.isOpen() && !selch.isRegistered())
       
   256                 ((SelChImpl)selch).kill();
       
   257             i.remove();
       
   258         }
       
   259     }
   244     }
   260 
   245 
   261     @Override
   246     @Override
   262     protected void implRegister(SelectionKeyImpl ski) {
   247     protected void implRegister(SelectionKeyImpl ski) {
   263         assert Thread.holdsLock(nioKeys());
       
   264         ensureOpen();
   248         ensureOpen();
   265         synchronized (updateLock) {
   249         synchronized (updateLock) {
   266             newKeys.addLast(ski);
   250             newKeys.addLast(ski);
   267         }
   251         }
   268         keys.add(ski);
       
   269     }
   252     }
   270 
   253 
   271     @Override
   254     @Override
   272     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   255     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   273         assert !ski.isValid();
   256         assert !ski.isValid();
   274         assert Thread.holdsLock(this);
   257         assert Thread.holdsLock(this);
   275         assert Thread.holdsLock(nioKeys());
       
   276         assert Thread.holdsLock(nioSelectedKeys());
       
   277 
   258 
   278         int fd = ski.channel.getFDVal();
   259         int fd = ski.channel.getFDVal();
   279         fdToKey.remove(fd);
   260         if (fdToKey.remove(fd) != null) {
   280         if (registered.get(fd)) {
   261             if (ski.registeredEvents() != 0) {
   281             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
   262                 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
   282             registered.clear(fd);
   263                 ski.registeredEvents(0);
   283         }
   264             }
   284 
   265         } else {
   285         selectedKeys.remove(ski);
   266             assert ski.registeredEvents() == 0;
   286         keys.remove(ski);
   267         }
   287 
   268     }
   288         // remove from channel's key set
   269 
   289         deregister(ski);
   270     @Override
   290 
   271     public void putEventOps(SelectionKeyImpl ski, int events) {
   291         SelectableChannel selch = ski.channel();
       
   292         if (!selch.isOpen() && !selch.isRegistered())
       
   293             ((SelChImpl) selch).kill();
       
   294     }
       
   295 
       
   296     @Override
       
   297     public void putEventOps(SelectionKeyImpl ski, int ops) {
       
   298         ensureOpen();
   272         ensureOpen();
   299         synchronized (updateLock) {
   273         synchronized (updateLock) {
   300             updateOps.addLast(ops);   // ops first in case adding the key fails
   274             updateEvents.addLast(events);  // events first in case adding key fails
   301             updateKeys.addLast(ski);
   275             updateKeys.addLast(ski);
   302         }
   276         }
   303     }
   277     }
   304 
   278 
   305     @Override
   279     @Override