src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
changeset 49417 1d3139252c1c
parent 49290 07779973cbe2
child 49493 814bd31f8da0
equal deleted inserted replaced
49416:f14852315495 49417:1d3139252c1c
    20  *
    20  *
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
       
    26 package sun.nio.ch;
    25 package sun.nio.ch;
    27 
    26 
    28 import java.io.IOException;
    27 import java.io.IOException;
    29 import java.nio.channels.*;
    28 import java.nio.channels.ClosedSelectorException;
    30 import java.nio.channels.spi.*;
    29 import java.nio.channels.SelectableChannel;
    31 import java.util.*;
    30 import java.nio.channels.SelectionKey;
    32 
    31 import java.nio.channels.Selector;
       
    32 import java.nio.channels.spi.SelectorProvider;
       
    33 import java.util.ArrayDeque;
       
    34 import java.util.ArrayList;
       
    35 import java.util.Deque;
       
    36 import java.util.Iterator;
       
    37 import java.util.List;
       
    38 import java.util.concurrent.TimeUnit;
       
    39 
       
    40 import jdk.internal.misc.Unsafe;
    33 
    41 
    34 /**
    42 /**
    35  * An implementation of Selector for Solaris.
    43  * Selector implementation based on poll
    36  */
    44  */
    37 
    45 
    38 class PollSelectorImpl
    46 class PollSelectorImpl extends SelectorImpl {
    39     extends AbstractPollSelectorImpl
    47 
    40 {
    48     // initial capacity of poll array
    41 
    49     private static final int INITIAL_CAPACITY = 16;
    42     // File descriptors used for interrupt
    50 
    43     private int fd0;
    51     // poll array, grows as needed
    44     private int fd1;
    52     private int pollArrayCapacity = INITIAL_CAPACITY;
    45 
    53     private int pollArraySize;
    46     // Lock for interrupt triggering and clearing
    54     private AllocatedNativeObject pollArray;
    47     private Object interruptLock = new Object();
    55 
    48     private boolean interruptTriggered = false;
    56     // file descriptors used for interrupt
    49 
    57     private final int fd0;
    50     /**
    58     private final int fd1;
    51      * Package private constructor called by factory method in
    59 
    52      * the abstract superclass Selector.
    60     // keys for file descriptors in poll array, synchronize on selector
    53      */
    61     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
       
    62 
       
    63     // pending updates, queued by putEventOps
       
    64     private final Object updateLock = new Object();
       
    65     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
       
    66     private final Deque<Integer> updateOps = new ArrayDeque<>();
       
    67 
       
    68     // interrupt triggering and clearing
       
    69     private final Object interruptLock = new Object();
       
    70     private boolean interruptTriggered;
       
    71 
    54     PollSelectorImpl(SelectorProvider sp) throws IOException {
    72     PollSelectorImpl(SelectorProvider sp) throws IOException {
    55         super(sp, 1, 1);
    73         super(sp);
    56         long pipeFds = IOUtil.makePipe(false);
    74 
    57         fd0 = (int) (pipeFds >>> 32);
    75         int size = pollArrayCapacity * SIZE_POLLFD;
    58         fd1 = (int) pipeFds;
    76         this.pollArray = new AllocatedNativeObject(size, false);
       
    77 
    59         try {
    78         try {
    60             pollWrapper = new PollArrayWrapper(INIT_CAP);
    79             long fds = IOUtil.makePipe(false);
    61             pollWrapper.initInterrupt(fd0, fd1);
    80             this.fd0 = (int) (fds >>> 32);
    62             channelArray = new SelectionKeyImpl[INIT_CAP];
    81             this.fd1 = (int) fds;
    63         } catch (Throwable t) {
    82         } catch (IOException ioe) {
    64             try {
    83             pollArray.free();
    65                 FileDispatcherImpl.closeIntFD(fd0);
    84             throw ioe;
    66             } catch (IOException ioe0) {
    85         }
    67                 t.addSuppressed(ioe0);
    86 
    68             }
    87         // wakeup support
    69             try {
    88         synchronized (this) {
    70                 FileDispatcherImpl.closeIntFD(fd1);
    89             setFirst(fd0, Net.POLLIN);
    71             } catch (IOException ioe1) {
    90         }
    72                 t.addSuppressed(ioe1);
    91     }
    73             }
    92 
    74             throw t;
    93     private void ensureOpen() {
    75         }
    94         if (!isOpen())
    76     }
       
    77 
       
    78     protected int doSelect(long timeout)
       
    79         throws IOException
       
    80     {
       
    81         if (channelArray == null)
       
    82             throw new ClosedSelectorException();
    95             throw new ClosedSelectorException();
       
    96     }
       
    97 
       
    98     @Override
       
    99     protected int doSelect(long timeout) throws IOException {
       
   100         assert Thread.holdsLock(this);
       
   101 
       
   102         processUpdateQueue();
    83         processDeregisterQueue();
   103         processDeregisterQueue();
    84         try {
   104         try {
    85             begin();
   105             begin();
    86             pollWrapper.poll(totalChannels, 0, timeout);
   106 
       
   107             int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
       
   108             boolean timedPoll = (to > 0);
       
   109             int numPolled;
       
   110             do {
       
   111                 long startTime = timedPoll ? System.nanoTime() : 0;
       
   112                 numPolled = poll(pollArray.address(), pollArraySize, to);
       
   113                 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
       
   114                     // timed poll interrupted so need to adjust timeout
       
   115                     long adjust = System.nanoTime() - startTime;
       
   116                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
       
   117                     if (to <= 0) {
       
   118                         // timeout expired so no retry
       
   119                         numPolled = 0;
       
   120                     }
       
   121                 }
       
   122             } while (numPolled == IOStatus.INTERRUPTED);
       
   123             assert numPolled <= pollArraySize;
       
   124 
    87         } finally {
   125         } finally {
    88             end();
   126             end();
    89         }
   127         }
       
   128 
    90         processDeregisterQueue();
   129         processDeregisterQueue();
    91         int numKeysUpdated = updateSelectedKeys();
   130         return updateSelectedKeys();
    92         if (pollWrapper.getReventOps(0) != 0) {
   131     }
    93             // Clear the wakeup pipe
   132 
    94             pollWrapper.putReventOps(0, 0);
   133     /**
    95             synchronized (interruptLock) {
   134      * Process changes to the interest ops.
    96                 IOUtil.drain(fd0);
   135      */
    97                 interruptTriggered = false;
   136     private void processUpdateQueue() {
       
   137         assert Thread.holdsLock(this);
       
   138 
       
   139         synchronized (updateLock) {
       
   140             assert updateKeys.size() == updateOps.size();
       
   141 
       
   142             SelectionKeyImpl ski;
       
   143             while ((ski = updateKeys.pollFirst()) != null) {
       
   144                 int ops = updateOps.pollFirst();
       
   145                 if (ski.isValid()) {
       
   146                     int index = ski.getIndex();
       
   147                     assert index >= 0 && index < pollArraySize;
       
   148                     if (index > 0) {
       
   149                         assert pollKeys.get(index) == ski;
       
   150                         if (ops == 0) {
       
   151                             remove(ski);
       
   152                         } else {
       
   153                             update(ski, ops);
       
   154                         }
       
   155                     } else if (ops != 0) {
       
   156                         add(ski, ops);
       
   157                     }
       
   158                 }
    98             }
   159             }
    99         }
   160         }
       
   161     }
       
   162 
       
   163     /**
       
   164      * Update the keys whose fd's have been selected by kqueue.
       
   165      * Add the ready keys to the selected key set.
       
   166      * If the interrupt fd has been selected, drain it and clear the interrupt.
       
   167      */
       
   168     private int updateSelectedKeys() throws IOException {
       
   169         assert Thread.holdsLock(this);
       
   170         assert Thread.holdsLock(nioSelectedKeys());
       
   171         assert pollArraySize > 0 && pollArraySize == pollKeys.size();
       
   172 
       
   173         int numKeysUpdated = 0;
       
   174         for (int i = 1; i < pollArraySize; i++) {
       
   175             int rOps = getReventOps(i);
       
   176             if (rOps != 0) {
       
   177                 SelectionKeyImpl ski = pollKeys.get(i);
       
   178                 assert ski.channel.getFDVal() == getDescriptor(i);
       
   179                 if (ski.isValid()) {
       
   180                     if (selectedKeys.contains(ski)) {
       
   181                         if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
       
   182                             numKeysUpdated++;
       
   183                         }
       
   184                     } else {
       
   185                         ski.channel.translateAndSetReadyOps(rOps, ski);
       
   186                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
       
   187                             selectedKeys.add(ski);
       
   188                             numKeysUpdated++;
       
   189                         }
       
   190                     }
       
   191                 }
       
   192             }
       
   193         }
       
   194 
       
   195         // check for interrupt
       
   196         if (getReventOps(0) != 0) {
       
   197             assert getDescriptor(0) == fd0;
       
   198             clearInterrupt();
       
   199         }
       
   200 
   100         return numKeysUpdated;
   201         return numKeysUpdated;
   101     }
   202     }
   102 
   203 
   103     protected void implCloseInterrupt() throws IOException {
   204     @Override
       
   205     protected void implClose() throws IOException {
       
   206         assert !isOpen();
       
   207         assert Thread.holdsLock(this);
       
   208         assert Thread.holdsLock(nioKeys());
       
   209 
   104         // prevent further wakeup
   210         // prevent further wakeup
   105         synchronized (interruptLock) {
   211         synchronized (interruptLock) {
   106             interruptTriggered = true;
   212             interruptTriggered = true;
   107         }
   213         }
       
   214 
       
   215         pollArray.free();
   108         FileDispatcherImpl.closeIntFD(fd0);
   216         FileDispatcherImpl.closeIntFD(fd0);
   109         FileDispatcherImpl.closeIntFD(fd1);
   217         FileDispatcherImpl.closeIntFD(fd1);
   110         fd0 = -1;
   218 
   111         fd1 = -1;
   219         // Deregister channels
   112         pollWrapper.release(0);
   220         Iterator<SelectionKey> i = keys.iterator();
   113     }
   221         while (i.hasNext()) {
   114 
   222             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
       
   223             ski.setIndex(-1);
       
   224             deregister(ski);
       
   225             SelectableChannel selch = ski.channel();
       
   226             if (!selch.isOpen() && !selch.isRegistered())
       
   227                 ((SelChImpl)selch).kill();
       
   228             i.remove();
       
   229         }
       
   230     }
       
   231 
       
   232     @Override
       
   233     protected void implRegister(SelectionKeyImpl ski) {
       
   234         assert ski.getIndex() == 0;
       
   235         assert Thread.holdsLock(nioKeys());
       
   236 
       
   237         ensureOpen();
       
   238         keys.add(ski);
       
   239     }
       
   240 
       
   241     @Override
       
   242     protected void implDereg(SelectionKeyImpl ski) throws IOException {
       
   243         assert !ski.isValid();
       
   244         assert Thread.holdsLock(this);
       
   245         assert Thread.holdsLock(nioKeys());
       
   246         assert Thread.holdsLock(nioSelectedKeys());
       
   247 
       
   248         // remove from poll array
       
   249         int index = ski.getIndex();
       
   250         if (index > 0) {
       
   251             remove(ski);
       
   252         }
       
   253 
       
   254         // remove from selected-key and key set
       
   255         selectedKeys.remove(ski);
       
   256         keys.remove(ski);
       
   257 
       
   258         // remove from channel's key set
       
   259         deregister(ski);
       
   260 
       
   261         SelectableChannel selch = ski.channel();
       
   262         if (!selch.isOpen() && !selch.isRegistered())
       
   263             ((SelChImpl) selch).kill();
       
   264     }
       
   265 
       
   266     @Override
       
   267     public void putEventOps(SelectionKeyImpl ski, int ops) {
       
   268         ensureOpen();
       
   269         synchronized (updateLock) {
       
   270             updateOps.addLast(ops);   // ops first in case adding the key fails
       
   271             updateKeys.addLast(ski);
       
   272         }
       
   273     }
       
   274 
       
   275     @Override
   115     public Selector wakeup() {
   276     public Selector wakeup() {
   116         synchronized (interruptLock) {
   277         synchronized (interruptLock) {
   117             if (!interruptTriggered) {
   278             if (!interruptTriggered) {
   118                 pollWrapper.interrupt();
   279                 try {
       
   280                     IOUtil.write1(fd1, (byte)0);
       
   281                 } catch (IOException ioe) {
       
   282                     throw new InternalError(ioe);
       
   283                 }
   119                 interruptTriggered = true;
   284                 interruptTriggered = true;
   120             }
   285             }
   121         }
   286         }
   122         return this;
   287         return this;
   123     }
   288     }
   124 
   289 
       
   290     private void clearInterrupt() throws IOException {
       
   291         synchronized (interruptLock) {
       
   292             IOUtil.drain(fd0);
       
   293             interruptTriggered = false;
       
   294         }
       
   295     }
       
   296 
       
   297     /**
       
   298      * Sets the first pollfd enty in the poll array to the given fd
       
   299      */
       
   300     private void setFirst(int fd, int ops) {
       
   301         assert pollArraySize == 0;
       
   302         assert pollKeys.isEmpty();
       
   303 
       
   304         putDescriptor(0, fd);
       
   305         putEventOps(0, ops);
       
   306         pollArraySize = 1;
       
   307 
       
   308         pollKeys.add(null);  // dummy element
       
   309     }
       
   310 
       
   311     /**
       
   312      * Adds a pollfd entry to the poll array, expanding the poll array if needed.
       
   313      */
       
   314     private void add(SelectionKeyImpl ski, int ops) {
       
   315         expandIfNeeded();
       
   316 
       
   317         int index = pollArraySize;
       
   318         assert index > 0;
       
   319         putDescriptor(index, ski.channel.getFDVal());
       
   320         putEventOps(index, ops);
       
   321         putReventOps(index, 0);
       
   322         ski.setIndex(index);
       
   323         pollArraySize++;
       
   324 
       
   325         pollKeys.add(ski);
       
   326         assert pollKeys.size() == pollArraySize;
       
   327     }
       
   328 
       
   329     /**
       
   330      * Update the events of pollfd entry.
       
   331      */
       
   332     private void update(SelectionKeyImpl ski, int ops) {
       
   333         int index = ski.getIndex();
       
   334         assert index > 0 && index < pollArraySize;
       
   335         assert getDescriptor(index) == ski.channel.getFDVal();
       
   336         putEventOps(index, ops);
       
   337     }
       
   338 
       
   339     /**
       
   340      * Removes a pollfd entry from the poll array
       
   341      */
       
   342     private void remove(SelectionKeyImpl ski) {
       
   343         int index = ski.getIndex();
       
   344         assert index > 0 && index < pollArraySize;
       
   345         assert getDescriptor(index) == ski.channel.getFDVal();
       
   346 
       
   347         // replace pollfd at index with the last pollfd in array
       
   348         int lastIndex = pollArraySize - 1;
       
   349         if (lastIndex != index) {
       
   350             SelectionKeyImpl lastKey = pollKeys.get(lastIndex);
       
   351             assert lastKey.getIndex() == lastIndex;
       
   352             int lastFd = getDescriptor(lastIndex);
       
   353             int lastOps = getEventOps(lastIndex);
       
   354             int lastRevents = getReventOps(lastIndex);
       
   355             assert lastKey.channel.getFDVal() == lastFd;
       
   356             putDescriptor(index, lastFd);
       
   357             putEventOps(index, lastOps);
       
   358             putReventOps(index, lastRevents);
       
   359             pollKeys.set(index, lastKey);
       
   360             lastKey.setIndex(index);
       
   361         }
       
   362         pollKeys.remove(lastIndex);
       
   363         pollArraySize--;
       
   364         assert pollKeys.size() == pollArraySize;
       
   365 
       
   366         ski.setIndex(0);
       
   367     }
       
   368 
       
   369     /**
       
   370      * Expand poll array if at capacity
       
   371      */
       
   372     private void expandIfNeeded() {
       
   373         if (pollArraySize == pollArrayCapacity) {
       
   374             int oldSize = pollArrayCapacity * SIZE_POLLFD;
       
   375             int newCapacity = pollArrayCapacity + INITIAL_CAPACITY;
       
   376             int newSize = newCapacity * SIZE_POLLFD;
       
   377             AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false);
       
   378             Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize);
       
   379             pollArray.free();
       
   380             pollArray = newPollArray;
       
   381             pollArrayCapacity = newCapacity;
       
   382         }
       
   383     }
       
   384 
       
   385     private static final short SIZE_POLLFD   = 8;
       
   386     private static final short FD_OFFSET     = 0;
       
   387     private static final short EVENT_OFFSET  = 4;
       
   388     private static final short REVENT_OFFSET = 6;
       
   389 
       
   390     private void putDescriptor(int i, int fd) {
       
   391         int offset = SIZE_POLLFD * i + FD_OFFSET;
       
   392         pollArray.putInt(offset, fd);
       
   393     }
       
   394 
       
   395     private int getDescriptor(int i) {
       
   396         int offset = SIZE_POLLFD * i + FD_OFFSET;
       
   397         return pollArray.getInt(offset);
       
   398     }
       
   399 
       
   400     private void putEventOps(int i, int event) {
       
   401         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
       
   402         pollArray.putShort(offset, (short)event);
       
   403     }
       
   404 
       
   405     private int getEventOps(int i) {
       
   406         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
       
   407         return pollArray.getShort(offset);
       
   408     }
       
   409 
       
   410     private void putReventOps(int i, int revent) {
       
   411         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
       
   412         pollArray.putShort(offset, (short)revent);
       
   413     }
       
   414 
       
   415     private int getReventOps(int i) {
       
   416         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
       
   417         return pollArray.getShort(offset);
       
   418     }
       
   419 
       
   420     private static native int poll(long pollAddress, int numfds, int timeout);
       
   421 
       
   422     static {
       
   423         IOUtil.load();
       
   424     }
   125 }
   425 }