src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
changeset 49493 814bd31f8da0
parent 49248 15a0e60c8b97
child 49526 cad4c844902a
equal deleted inserted replaced
49492:f1a8ec1a6972 49493:814bd31f8da0
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 /*
       
    27  */
       
    28 
       
    29 
       
    30 package sun.nio.ch;
    26 package sun.nio.ch;
    31 
    27 
    32 import java.nio.channels.spi.SelectorProvider;
    28 import java.io.IOException;
    33 import java.nio.channels.Selector;
       
    34 import java.nio.channels.ClosedSelectorException;
    29 import java.nio.channels.ClosedSelectorException;
    35 import java.nio.channels.Pipe;
    30 import java.nio.channels.Pipe;
    36 import java.nio.channels.SelectableChannel;
    31 import java.nio.channels.Selector;
    37 import java.io.IOException;
    32 import java.nio.channels.spi.SelectorProvider;
    38 import java.nio.channels.CancelledKeyException;
    33 import java.util.ArrayDeque;
       
    34 import java.util.ArrayList;
       
    35 import java.util.Deque;
       
    36 import java.util.HashMap;
    39 import java.util.List;
    37 import java.util.List;
    40 import java.util.ArrayList;
    38 import java.util.Map;
    41 import java.util.HashMap;
       
    42 import java.util.Iterator;
       
    43 
    39 
    44 /**
    40 /**
    45  * A multi-threaded implementation of Selector for Windows.
    41  * A multi-threaded implementation of Selector for Windows.
    46  *
    42  *
    47  * @author Konstantin Kladko
    43  * @author Konstantin Kladko
    78     private final Pipe wakeupPipe;
    74     private final Pipe wakeupPipe;
    79 
    75 
    80     // File descriptors corresponding to source and sink
    76     // File descriptors corresponding to source and sink
    81     private final int wakeupSourceFd, wakeupSinkFd;
    77     private final int wakeupSourceFd, wakeupSinkFd;
    82 
    78 
    83     // Lock for close cleanup
       
    84     private final Object closeLock = new Object();
       
    85 
       
    86     // Maps file descriptors to their indices in  pollArray
    79     // Maps file descriptors to their indices in  pollArray
    87     private static final class FdMap extends HashMap<Integer, MapEntry> {
    80     private static final class FdMap extends HashMap<Integer, MapEntry> {
    88         static final long serialVersionUID = 0L;
    81         static final long serialVersionUID = 0L;
    89         private MapEntry get(int desc) {
    82         private MapEntry get(int desc) {
    90             return get(Integer.valueOf(desc));
    83             return get(Integer.valueOf(desc));
   101         }
    94         }
   102     }
    95     }
   103 
    96 
   104     // class for fdMap entries
    97     // class for fdMap entries
   105     private static final class MapEntry {
    98     private static final class MapEntry {
   106         SelectionKeyImpl ski;
    99         final SelectionKeyImpl ski;
   107         long updateCount = 0;
   100         long updateCount = 0;
   108         long clearedCount = 0;
   101         long clearedCount = 0;
   109         MapEntry(SelectionKeyImpl ski) {
   102         MapEntry(SelectionKeyImpl ski) {
   110             this.ski = ski;
   103             this.ski = ski;
   111         }
   104         }
   118     private long timeout; //timeout for poll
   111     private long timeout; //timeout for poll
   119 
   112 
   120     // Lock for interrupt triggering and clearing
   113     // Lock for interrupt triggering and clearing
   121     private final Object interruptLock = new Object();
   114     private final Object interruptLock = new Object();
   122     private volatile boolean interruptTriggered;
   115     private volatile boolean interruptTriggered;
       
   116 
       
   117     // pending new registrations/updates, queued by implRegister and putEventOps
       
   118     private final Object updateLock = new Object();
       
   119     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
       
   120     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
       
   121     private final Deque<Integer> updateEvents = new ArrayDeque<>();
       
   122 
   123 
   123 
   124     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
   124     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
   125         super(sp);
   125         super(sp);
   126         pollWrapper = new PollArrayWrapper(INIT_CAP);
   126         pollWrapper = new PollArrayWrapper(INIT_CAP);
   127         wakeupPipe = Pipe.open();
   127         wakeupPipe = Pipe.open();
   133         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
   133         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
   134 
   134 
   135         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
   135         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
   136     }
   136     }
   137 
   137 
       
   138     private void ensureOpen() {
       
   139         if (!isOpen())
       
   140             throw new ClosedSelectorException();
       
   141     }
       
   142 
   138     @Override
   143     @Override
   139     protected int doSelect(long timeout) throws IOException {
   144     protected int doSelect(long timeout) throws IOException {
   140         if (channelArray == null)
   145         assert Thread.holdsLock(this);
   141             throw new ClosedSelectorException();
       
   142         this.timeout = timeout; // set selector timeout
   146         this.timeout = timeout; // set selector timeout
       
   147         processUpdateQueue();
   143         processDeregisterQueue();
   148         processDeregisterQueue();
   144         if (interruptTriggered) {
   149         if (interruptTriggered) {
   145             resetWakeupSocket();
   150             resetWakeupSocket();
   146             return 0;
   151             return 0;
   147         }
   152         }
   172         processDeregisterQueue();
   177         processDeregisterQueue();
   173         int updated = updateSelectedKeys();
   178         int updated = updateSelectedKeys();
   174         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
   179         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
   175         resetWakeupSocket();
   180         resetWakeupSocket();
   176         return updated;
   181         return updated;
       
   182     }
       
   183 
       
   184     /**
       
   185      * Process new registrations and changes to the interest ops.
       
   186      */
       
   187     private void processUpdateQueue() {
       
   188         assert Thread.holdsLock(this);
       
   189 
       
   190         synchronized (updateLock) {
       
   191             SelectionKeyImpl ski;
       
   192 
       
   193             // new registrations
       
   194             while ((ski = newKeys.pollFirst()) != null) {
       
   195                 if (ski.isValid()) {
       
   196                     growIfNeeded();
       
   197                     channelArray[totalChannels] = ski;
       
   198                     ski.setIndex(totalChannels);
       
   199                     pollWrapper.putEntry(totalChannels, ski);
       
   200                     totalChannels++;
       
   201                     MapEntry previous = fdMap.put(ski);
       
   202                     assert previous == null;
       
   203                 }
       
   204             }
       
   205 
       
   206             // changes to interest ops
       
   207             assert updateKeys.size() == updateEvents.size();
       
   208             while ((ski = updateKeys.pollFirst()) != null) {
       
   209                 int events = updateEvents.pollFirst();
       
   210                 int fd = ski.channel.getFDVal();
       
   211                 if (ski.isValid() && fdMap.containsKey(fd)) {
       
   212                     int index = ski.getIndex();
       
   213                     assert index >= 0 && index < totalChannels;
       
   214                     pollWrapper.putEventOps(index, events);
       
   215                 }
       
   216             }
       
   217         }
   177     }
   218     }
   178 
   219 
   179     // Helper threads wait on this lock for the next poll.
   220     // Helper threads wait on this lock for the next poll.
   180     private final StartLock startLock = new StartLock();
   221     private final StartLock startLock = new StartLock();
   181 
   222 
   501         return numKeysUpdated;
   542         return numKeysUpdated;
   502     }
   543     }
   503 
   544 
   504     @Override
   545     @Override
   505     protected void implClose() throws IOException {
   546     protected void implClose() throws IOException {
   506         synchronized (closeLock) {
   547         assert !isOpen();
   507             if (channelArray != null) {
   548         assert Thread.holdsLock(this);
   508                 if (pollWrapper != null) {
   549 
   509                     // prevent further wakeup
   550         // prevent further wakeup
   510                     synchronized (interruptLock) {
   551         synchronized (interruptLock) {
   511                         interruptTriggered = true;
   552             interruptTriggered = true;
   512                     }
   553         }
   513                     wakeupPipe.sink().close();
   554 
   514                     wakeupPipe.source().close();
   555         wakeupPipe.sink().close();
   515                     for(int i = 1; i < totalChannels; i++) { // Deregister channels
   556         wakeupPipe.source().close();
   516                         if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
   557         pollWrapper.free();
   517                             deregister(channelArray[i]);
   558 
   518                             SelectableChannel selch = channelArray[i].channel();
   559         // Make all remaining helper threads exit
   519                             if (!selch.isOpen() && !selch.isRegistered())
   560         for (SelectThread t: threads)
   520                                 ((SelChImpl)selch).kill();
   561              t.makeZombie();
   521                         }
   562         startLock.startThreads();
   522                     }
   563     }
   523                     pollWrapper.free();
   564 
   524                     pollWrapper = null;
   565     @Override
   525                     channelArray = null;
       
   526                     // Make all remaining helper threads exit
       
   527                     for (SelectThread t: threads)
       
   528                          t.makeZombie();
       
   529                     startLock.startThreads();
       
   530                 }
       
   531             }
       
   532         }
       
   533     }
       
   534 
       
   535     protected void implRegister(SelectionKeyImpl ski) {
   566     protected void implRegister(SelectionKeyImpl ski) {
   536         synchronized (closeLock) {
   567         ensureOpen();
   537             if (pollWrapper == null)
   568         synchronized (updateLock) {
   538                 throw new ClosedSelectorException();
   569             newKeys.addLast(ski);
   539             growIfNeeded();
       
   540             channelArray[totalChannels] = ski;
       
   541             ski.setIndex(totalChannels);
       
   542             fdMap.put(ski);
       
   543             keys.add(ski);
       
   544             pollWrapper.addEntry(totalChannels, ski);
       
   545             totalChannels++;
       
   546         }
   570         }
   547     }
   571     }
   548 
   572 
   549     private void growIfNeeded() {
   573     private void growIfNeeded() {
   550         if (channelArray.length == totalChannels) {
   574         if (channelArray.length == totalChannels) {
   559             totalChannels++;
   583             totalChannels++;
   560             threadsCount++;
   584             threadsCount++;
   561         }
   585         }
   562     }
   586     }
   563 
   587 
   564     protected void implDereg(SelectionKeyImpl ski) throws IOException{
   588     @Override
   565         int i = ski.getIndex();
   589     protected void implDereg(SelectionKeyImpl ski) {
   566         assert (i >= 0);
   590         assert !ski.isValid();
   567         synchronized (closeLock) {
   591         assert Thread.holdsLock(this);
       
   592 
       
   593         if (fdMap.remove(ski) != null) {
       
   594             int i = ski.getIndex();
       
   595             assert (i >= 0);
       
   596 
   568             if (i != totalChannels - 1) {
   597             if (i != totalChannels - 1) {
   569                 // Copy end one over it
   598                 // Copy end one over it
   570                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
   599                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
   571                 channelArray[i] = endChannel;
   600                 channelArray[i] = endChannel;
   572                 endChannel.setIndex(i);
   601                 endChannel.setIndex(i);
   573                 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
   602                 pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
   574                                                                 pollWrapper, i);
       
   575             }
   603             }
   576             ski.setIndex(-1);
   604             ski.setIndex(-1);
   577         }
   605 
   578         channelArray[totalChannels - 1] = null;
   606             channelArray[totalChannels - 1] = null;
   579         totalChannels--;
       
   580         if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
       
   581             totalChannels--;
   607             totalChannels--;
   582             threadsCount--; // The last thread has become redundant.
   608             if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
   583         }
   609                 totalChannels--;
   584         fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
   610                 threadsCount--; // The last thread has become redundant.
   585         keys.remove(ski);
   611             }
   586         selectedKeys.remove(ski);
   612         }
   587         deregister(ski);
   613     }
   588         SelectableChannel selch = ski.channel();
   614 
   589         if (!selch.isOpen() && !selch.isRegistered())
   615     @Override
   590             ((SelChImpl)selch).kill();
   616     public void putEventOps(SelectionKeyImpl ski, int events) {
   591     }
   617         ensureOpen();
   592 
   618         synchronized (updateLock) {
   593     public void putEventOps(SelectionKeyImpl sk, int ops) {
   619             updateEvents.addLast(events);  // events first in case adding key fails
   594         synchronized (closeLock) {
   620             updateKeys.addLast(ski);
   595             if (pollWrapper == null)
   621         }
   596                 throw new ClosedSelectorException();
   622     }
   597             // make sure this sk has not been removed yet
   623 
   598             int index = sk.getIndex();
   624     @Override
   599             if (index == -1)
       
   600                 throw new CancelledKeyException();
       
   601             pollWrapper.putEventOps(index, ops);
       
   602         }
       
   603     }
       
   604 
       
   605     public Selector wakeup() {
   625     public Selector wakeup() {
   606         synchronized (interruptLock) {
   626         synchronized (interruptLock) {
   607             if (!interruptTriggered) {
   627             if (!interruptTriggered) {
   608                 setWakeupSocket();
   628                 setWakeupSocket();
   609                 interruptTriggered = true;
   629                 interruptTriggered = true;