src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
changeset 49290 07779973cbe2
parent 49248 15a0e60c8b97
child 49493 814bd31f8da0
equal deleted inserted replaced
49289:148e29df1644 49290:07779973cbe2
    24  */
    24  */
    25 
    25 
    26 package sun.nio.ch;
    26 package sun.nio.ch;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.channels.*;
    29 import java.nio.channels.ClosedSelectorException;
    30 import java.nio.channels.spi.*;
    30 import java.nio.channels.SelectableChannel;
    31 import java.util.*;
    31 import java.nio.channels.SelectionKey;
       
    32 import java.nio.channels.Selector;
       
    33 import java.nio.channels.spi.SelectorProvider;
       
    34 import java.util.ArrayDeque;
       
    35 import java.util.BitSet;
       
    36 import java.util.Deque;
       
    37 import java.util.HashMap;
       
    38 import java.util.Iterator;
       
    39 import java.util.Map;
       
    40 import java.util.concurrent.TimeUnit;
       
    41 
       
    42 import static sun.nio.ch.EPoll.EPOLLIN;
       
    43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
       
    44 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
       
    45 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
       
    46 
    32 
    47 
    33 /**
    48 /**
    34  * An implementation of Selector for Linux 2.6+ kernels that uses
    49  * Linux epoll based Selector implementation
    35  * the epoll event notification facility.
       
    36  */
    50  */
    37 class EPollSelectorImpl
    51 
    38     extends SelectorImpl
    52 class EPollSelectorImpl extends SelectorImpl {
    39 {
    53 
    40     // File descriptors used for interrupt
    54     // maximum number of events to poll in one call to epoll_wait
       
    55     private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
       
    56 
       
    57     // epoll file descriptor
       
    58     private final int epfd;
       
    59 
       
    60     // address of poll array when polling with epoll_wait
       
    61     private final long pollArrayAddress;
       
    62 
       
    63     // file descriptors used for interrupt
    41     private final int fd0;
    64     private final int fd0;
    42     private final int fd1;
    65     private final int fd1;
    43 
    66 
    44     // The poll object
    67     // maps file descriptor to selection key, synchronize on selector
    45     private final EPollArrayWrapper pollWrapper;
    68     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
    46 
    69 
    47     // Maps from file descriptors to keys
    70     // file descriptors registered with epoll, synchronize on selector
    48     private final Map<Integer, SelectionKeyImpl> fdToKey;
    71     private final BitSet registered = new BitSet();
    49 
    72 
    50     // True if this Selector has been closed
    73     // pending new registrations/updates, queued by implRegister and putEventOps
    51     private volatile boolean closed;
    74     private final Object updateLock = new Object();
    52 
    75     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
    53     // Lock for interrupt triggering and clearing
    76     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
       
    77     private final Deque<Integer> updateOps = new ArrayDeque<>();
       
    78 
       
    79     // interrupt triggering and clearing
    54     private final Object interruptLock = new Object();
    80     private final Object interruptLock = new Object();
    55     private boolean interruptTriggered = false;
    81     private boolean interruptTriggered;
    56 
    82 
    57     /**
    83     /**
    58      * Package private constructor called by factory method in
    84      * Package private constructor called by factory method in
    59      * the abstract superclass Selector.
    85      * the abstract superclass Selector.
    60      */
    86      */
    61     EPollSelectorImpl(SelectorProvider sp) throws IOException {
    87     EPollSelectorImpl(SelectorProvider sp) throws IOException {
    62         super(sp);
    88         super(sp);
    63         long pipeFds = IOUtil.makePipe(false);
    89 
    64         fd0 = (int) (pipeFds >>> 32);
    90         this.epfd = EPoll.create();
    65         fd1 = (int) pipeFds;
    91         this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
       
    92 
    66         try {
    93         try {
    67             pollWrapper = new EPollArrayWrapper(fd0, fd1);
    94             long fds = IOUtil.makePipe(false);
    68             fdToKey = new HashMap<>();
    95             this.fd0 = (int) (fds >>> 32);
    69         } catch (Throwable t) {
    96             this.fd1 = (int) fds;
    70             try {
    97         } catch (IOException ioe) {
    71                 FileDispatcherImpl.closeIntFD(fd0);
    98             EPoll.freePollArray(pollArrayAddress);
    72             } catch (IOException ioe0) {
    99             FileDispatcherImpl.closeIntFD(epfd);
    73                 t.addSuppressed(ioe0);
   100             throw ioe;
    74             }
   101         }
    75             try {
   102 
    76                 FileDispatcherImpl.closeIntFD(fd1);
   103         // register one end of the socket pair for wakeups
    77             } catch (IOException ioe1) {
   104         EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    78                 t.addSuppressed(ioe1);
       
    79             }
       
    80             throw t;
       
    81         }
       
    82     }
   105     }
    83 
   106 
    84     private void ensureOpen() {
   107     private void ensureOpen() {
    85         if (closed)
   108         if (!isOpen())
    86             throw new ClosedSelectorException();
   109             throw new ClosedSelectorException();
    87     }
   110     }
    88 
   111 
    89     @Override
   112     @Override
    90     protected int doSelect(long timeout) throws IOException {
   113     protected int doSelect(long timeout) throws IOException {
    91         ensureOpen();
   114         assert Thread.holdsLock(this);
       
   115 
    92         int numEntries;
   116         int numEntries;
       
   117         processUpdateQueue();
    93         processDeregisterQueue();
   118         processDeregisterQueue();
    94         try {
   119         try {
    95             begin();
   120             begin();
    96             numEntries = pollWrapper.poll(timeout);
   121 
       
   122             // epoll_wait timeout is int
       
   123             int to = (int) Math.min(timeout, Integer.MAX_VALUE);
       
   124             boolean timedPoll = (to > 0);
       
   125             do {
       
   126                 long startTime = timedPoll ? System.nanoTime() : 0;
       
   127                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
       
   128                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
       
   129                     // timed poll interrupted so need to adjust timeout
       
   130                     long adjust = System.nanoTime() - startTime;
       
   131                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
       
   132                     if (to <= 0) {
       
   133                         // timeout expired so no retry
       
   134                         numEntries = 0;
       
   135                     }
       
   136                 }
       
   137             } while (numEntries == IOStatus.INTERRUPTED);
       
   138             assert IOStatus.check(numEntries);
       
   139 
    97         } finally {
   140         } finally {
    98             end();
   141             end();
    99         }
   142         }
   100         processDeregisterQueue();
   143         processDeregisterQueue();
   101         return updateSelectedKeys(numEntries);
   144         return updateSelectedKeys(numEntries);
       
   145     }
       
   146 
       
   147     /**
       
   148      * Process new registrations and changes to the interest ops.
       
   149      */
       
   150     private void processUpdateQueue() {
       
   151         assert Thread.holdsLock(this);
       
   152 
       
   153         synchronized (updateLock) {
       
   154             SelectionKeyImpl ski;
       
   155 
       
   156             // new registrations
       
   157             while ((ski = newKeys.pollFirst()) != null) {
       
   158                 if (ski.isValid()) {
       
   159                     SelChImpl ch = ski.channel;
       
   160                     int fd = ch.getFDVal();
       
   161                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
       
   162                     assert previous == null;
       
   163                     assert registered.get(fd) == false;
       
   164                 }
       
   165             }
       
   166 
       
   167             // changes to interest ops
       
   168             assert updateKeys.size() == updateOps.size();
       
   169             while ((ski = updateKeys.pollFirst()) != null) {
       
   170                 int ops = updateOps.pollFirst();
       
   171                 int fd = ski.channel.getFDVal();
       
   172                 if (ski.isValid() && fdToKey.containsKey(fd)) {
       
   173                     if (registered.get(fd)) {
       
   174                         if (ops == 0) {
       
   175                             // remove from epoll
       
   176                             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
       
   177                             registered.clear(fd);
       
   178                         } else {
       
   179                             // modify events
       
   180                             EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
       
   181                         }
       
   182                     } else if (ops != 0) {
       
   183                         // add to epoll
       
   184                         EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
       
   185                         registered.set(fd);
       
   186                     }
       
   187                 }
       
   188             }
       
   189         }
   102     }
   190     }
   103 
   191 
   104     /**
   192     /**
   105      * Update the keys whose fd's have been selected by the epoll.
   193      * Update the keys whose fd's have been selected by the epoll.
   106      * Add the ready keys to the ready queue.
   194      * Add the ready keys to the ready queue.
   107      */
   195      */
   108     private int updateSelectedKeys(int numEntries) throws IOException {
   196     private int updateSelectedKeys(int numEntries) throws IOException {
       
   197         assert Thread.holdsLock(this);
       
   198         assert Thread.holdsLock(nioSelectedKeys());
       
   199 
   109         boolean interrupted = false;
   200         boolean interrupted = false;
   110         int numKeysUpdated = 0;
   201         int numKeysUpdated = 0;
   111         for (int i=0; i<numEntries; i++) {
   202         for (int i=0; i<numEntries; i++) {
   112             int nextFD = pollWrapper.getDescriptor(i);
   203             long event = EPoll.getEvent(pollArrayAddress, i);
   113             if (nextFD == fd0) {
   204             int fd = EPoll.getDescriptor(event);
       
   205             if (fd == fd0) {
   114                 interrupted = true;
   206                 interrupted = true;
   115             } else {
   207             } else {
   116                 SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
   208                 SelectionKeyImpl ski = fdToKey.get(fd);
   117                 if (ski != null) {
   209                 if (ski != null) {
   118                     int rOps = pollWrapper.getEventOps(i);
   210                     int rOps = EPoll.getEvents(event);
   119                     if (selectedKeys.contains(ski)) {
   211                     if (selectedKeys.contains(ski)) {
   120                         if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
   212                         if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
   121                             numKeysUpdated++;
   213                             numKeysUpdated++;
   122                         }
   214                         }
   123                     } else {
   215                     } else {
   138         return numKeysUpdated;
   230         return numKeysUpdated;
   139     }
   231     }
   140 
   232 
   141     @Override
   233     @Override
   142     protected void implClose() throws IOException {
   234     protected void implClose() throws IOException {
   143         if (closed)
   235         assert Thread.holdsLock(this);
   144             return;
   236         assert Thread.holdsLock(nioKeys());
   145         closed = true;
       
   146 
   237 
   147         // prevent further wakeup
   238         // prevent further wakeup
   148         synchronized (interruptLock) {
   239         synchronized (interruptLock) {
   149             interruptTriggered = true;
   240             interruptTriggered = true;
   150         }
   241         }
   151 
   242 
   152         pollWrapper.close();
   243         FileDispatcherImpl.closeIntFD(epfd);
       
   244         EPoll.freePollArray(pollArrayAddress);
       
   245 
   153         FileDispatcherImpl.closeIntFD(fd0);
   246         FileDispatcherImpl.closeIntFD(fd0);
   154         FileDispatcherImpl.closeIntFD(fd1);
   247         FileDispatcherImpl.closeIntFD(fd1);
   155 
   248 
   156         // Deregister channels
   249         // Deregister channels
   157         Iterator<SelectionKey> i = keys.iterator();
   250         Iterator<SelectionKey> i = keys.iterator();
   165         }
   258         }
   166     }
   259     }
   167 
   260 
   168     @Override
   261     @Override
   169     protected void implRegister(SelectionKeyImpl ski) {
   262     protected void implRegister(SelectionKeyImpl ski) {
       
   263         assert Thread.holdsLock(nioKeys());
   170         ensureOpen();
   264         ensureOpen();
   171         SelChImpl ch = ski.channel;
   265         synchronized (updateLock) {
   172         int fd = Integer.valueOf(ch.getFDVal());
   266             newKeys.addLast(ski);
   173         fdToKey.put(fd, ski);
   267         }
   174         pollWrapper.add(fd);
       
   175         keys.add(ski);
   268         keys.add(ski);
   176     }
   269     }
   177 
   270 
   178     @Override
   271     @Override
   179     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   272     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   180         assert (ski.getIndex() >= 0);
   273         assert !ski.isValid();
   181         SelChImpl ch = ski.channel;
   274         assert Thread.holdsLock(this);
   182         int fd = ch.getFDVal();
   275         assert Thread.holdsLock(nioKeys());
   183         fdToKey.remove(Integer.valueOf(fd));
   276         assert Thread.holdsLock(nioSelectedKeys());
   184         pollWrapper.remove(fd);
   277 
   185         ski.setIndex(-1);
   278         int fd = ski.channel.getFDVal();
       
   279         fdToKey.remove(fd);
       
   280         if (registered.get(fd)) {
       
   281             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
       
   282             registered.clear(fd);
       
   283         }
       
   284 
       
   285         selectedKeys.remove(ski);
   186         keys.remove(ski);
   286         keys.remove(ski);
   187         selectedKeys.remove(ski);
   287 
       
   288         // remove from channel's key set
   188         deregister(ski);
   289         deregister(ski);
       
   290 
   189         SelectableChannel selch = ski.channel();
   291         SelectableChannel selch = ski.channel();
   190         if (!selch.isOpen() && !selch.isRegistered())
   292         if (!selch.isOpen() && !selch.isRegistered())
   191             ((SelChImpl)selch).kill();
   293             ((SelChImpl) selch).kill();
   192     }
   294     }
   193 
   295 
   194     @Override
   296     @Override
   195     public void putEventOps(SelectionKeyImpl ski, int ops) {
   297     public void putEventOps(SelectionKeyImpl ski, int ops) {
   196         ensureOpen();
   298         ensureOpen();
   197         SelChImpl ch = ski.channel;
   299         synchronized (updateLock) {
   198         pollWrapper.setInterest(ch.getFDVal(), ops);
   300             updateOps.addLast(ops);   // ops first in case adding the key fails
       
   301             updateKeys.addLast(ski);
       
   302         }
   199     }
   303     }
   200 
   304 
   201     @Override
   305     @Override
   202     public Selector wakeup() {
   306     public Selector wakeup() {
   203         synchronized (interruptLock) {
   307         synchronized (interruptLock) {
   204             if (!interruptTriggered) {
   308             if (!interruptTriggered) {
   205                 pollWrapper.interrupt();
   309                 try {
       
   310                     IOUtil.write1(fd1, (byte)0);
       
   311                 } catch (IOException ioe) {
       
   312                     throw new InternalError(ioe);
       
   313                 }
   206                 interruptTriggered = true;
   314                 interruptTriggered = true;
   207             }
   315             }
   208         }
   316         }
   209         return this;
   317         return this;
   210     }
   318     }