src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
changeset 47216 71c04702a3d5
parent 37521 b6e0f285c998
child 49248 15a0e60c8b97
equal deleted inserted replaced
47215:4ebc2e2fb97c 47216:71c04702a3d5
       
     1 /*
       
     2  * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 /*
       
    27  */
       
    28 
       
    29 
       
    30 package sun.nio.ch;
       
    31 
       
    32 import java.nio.channels.spi.SelectorProvider;
       
    33 import java.nio.channels.Selector;
       
    34 import java.nio.channels.ClosedSelectorException;
       
    35 import java.nio.channels.Pipe;
       
    36 import java.nio.channels.SelectableChannel;
       
    37 import java.io.IOException;
       
    38 import java.nio.channels.CancelledKeyException;
       
    39 import java.util.List;
       
    40 import java.util.ArrayList;
       
    41 import java.util.HashMap;
       
    42 import java.util.Iterator;
       
    43 
       
    44 /**
       
    45  * A multi-threaded implementation of Selector for Windows.
       
    46  *
       
    47  * @author Konstantin Kladko
       
    48  * @author Mark Reinhold
       
    49  */
       
    50 
       
    51 final class WindowsSelectorImpl extends SelectorImpl {
       
    52     // Initial capacity of the poll array
       
    53     private final int INIT_CAP = 8;
       
    54     // Maximum number of sockets for select().
       
    55     // Should be INIT_CAP times a power of 2
       
    56     private static final int MAX_SELECTABLE_FDS = 1024;
       
    57 
       
    58     // The list of SelectableChannels serviced by this Selector. Every mod
       
    59     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
       
    60     // array,  where the corresponding entry is occupied by the wakeupSocket
       
    61     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
       
    62 
       
    63     // The global native poll array holds file decriptors and event masks
       
    64     private PollArrayWrapper pollWrapper;
       
    65 
       
    66     // The number of valid entries in  poll array, including entries occupied
       
    67     // by wakeup socket handle.
       
    68     private int totalChannels = 1;
       
    69 
       
    70     // Number of helper threads needed for select. We need one thread per
       
    71     // each additional set of MAX_SELECTABLE_FDS - 1 channels.
       
    72     private int threadsCount = 0;
       
    73 
       
    74     // A list of helper threads for select.
       
    75     private final List<SelectThread> threads = new ArrayList<SelectThread>();
       
    76 
       
    77     //Pipe used as a wakeup object.
       
    78     private final Pipe wakeupPipe;
       
    79 
       
    80     // File descriptors corresponding to source and sink
       
    81     private final int wakeupSourceFd, wakeupSinkFd;
       
    82 
       
    83     // Lock for close cleanup
       
    84     private Object closeLock = new Object();
       
    85 
       
    86     // Maps file descriptors to their indices in  pollArray
       
    87     private static final class FdMap extends HashMap<Integer, MapEntry> {
       
    88         static final long serialVersionUID = 0L;
       
    89         private MapEntry get(int desc) {
       
    90             return get(Integer.valueOf(desc));
       
    91         }
       
    92         private MapEntry put(SelectionKeyImpl ski) {
       
    93             return put(Integer.valueOf(ski.channel.getFDVal()), new MapEntry(ski));
       
    94         }
       
    95         private MapEntry remove(SelectionKeyImpl ski) {
       
    96             Integer fd = Integer.valueOf(ski.channel.getFDVal());
       
    97             MapEntry x = get(fd);
       
    98             if ((x != null) && (x.ski.channel == ski.channel))
       
    99                 return remove(fd);
       
   100             return null;
       
   101         }
       
   102     }
       
   103 
       
   104     // class for fdMap entries
       
   105     private static final class MapEntry {
       
   106         SelectionKeyImpl ski;
       
   107         long updateCount = 0;
       
   108         long clearedCount = 0;
       
   109         MapEntry(SelectionKeyImpl ski) {
       
   110             this.ski = ski;
       
   111         }
       
   112     }
       
   113     private final FdMap fdMap = new FdMap();
       
   114 
       
   115     // SubSelector for the main thread
       
   116     private final SubSelector subSelector = new SubSelector();
       
   117 
       
   118     private long timeout; //timeout for poll
       
   119 
       
   120     // Lock for interrupt triggering and clearing
       
   121     private final Object interruptLock = new Object();
       
   122     private volatile boolean interruptTriggered;
       
   123 
       
   124     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
       
   125         super(sp);
       
   126         pollWrapper = new PollArrayWrapper(INIT_CAP);
       
   127         wakeupPipe = Pipe.open();
       
   128         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
       
   129 
       
   130         // Disable the Nagle algorithm so that the wakeup is more immediate
       
   131         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
       
   132         (sink.sc).socket().setTcpNoDelay(true);
       
   133         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
       
   134 
       
   135         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
       
   136     }
       
   137 
       
   138     protected int doSelect(long timeout) throws IOException {
       
   139         if (channelArray == null)
       
   140             throw new ClosedSelectorException();
       
   141         this.timeout = timeout; // set selector timeout
       
   142         processDeregisterQueue();
       
   143         if (interruptTriggered) {
       
   144             resetWakeupSocket();
       
   145             return 0;
       
   146         }
       
   147         // Calculate number of helper threads needed for poll. If necessary
       
   148         // threads are created here and start waiting on startLock
       
   149         adjustThreadsCount();
       
   150         finishLock.reset(); // reset finishLock
       
   151         // Wakeup helper threads, waiting on startLock, so they start polling.
       
   152         // Redundant threads will exit here after wakeup.
       
   153         startLock.startThreads();
       
   154         // do polling in the main thread. Main thread is responsible for
       
   155         // first MAX_SELECTABLE_FDS entries in pollArray.
       
   156         try {
       
   157             begin();
       
   158             try {
       
   159                 subSelector.poll();
       
   160             } catch (IOException e) {
       
   161                 finishLock.setException(e); // Save this exception
       
   162             }
       
   163             // Main thread is out of poll(). Wakeup others and wait for them
       
   164             if (threads.size() > 0)
       
   165                 finishLock.waitForHelperThreads();
       
   166           } finally {
       
   167               end();
       
   168           }
       
   169         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
       
   170         finishLock.checkForException();
       
   171         processDeregisterQueue();
       
   172         int updated = updateSelectedKeys();
       
   173         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
       
   174         resetWakeupSocket();
       
   175         return updated;
       
   176     }
       
   177 
       
   178     // Helper threads wait on this lock for the next poll.
       
   179     private final StartLock startLock = new StartLock();
       
   180 
       
   181     private final class StartLock {
       
   182         // A variable which distinguishes the current run of doSelect from the
       
   183         // previous one. Incrementing runsCounter and notifying threads will
       
   184         // trigger another round of poll.
       
   185         private long runsCounter;
       
   186        // Triggers threads, waiting on this lock to start polling.
       
   187         private synchronized void startThreads() {
       
   188             runsCounter++; // next run
       
   189             notifyAll(); // wake up threads.
       
   190         }
       
   191         // This function is called by a helper thread to wait for the
       
   192         // next round of poll(). It also checks, if this thread became
       
   193         // redundant. If yes, it returns true, notifying the thread
       
   194         // that it should exit.
       
   195         private synchronized boolean waitForStart(SelectThread thread) {
       
   196             while (true) {
       
   197                 while (runsCounter == thread.lastRun) {
       
   198                     try {
       
   199                         startLock.wait();
       
   200                     } catch (InterruptedException e) {
       
   201                         Thread.currentThread().interrupt();
       
   202                     }
       
   203                 }
       
   204                 if (thread.isZombie()) { // redundant thread
       
   205                     return true; // will cause run() to exit.
       
   206                 } else {
       
   207                     thread.lastRun = runsCounter; // update lastRun
       
   208                     return false; //   will cause run() to poll.
       
   209                 }
       
   210             }
       
   211         }
       
   212     }
       
   213 
       
   214     // Main thread waits on this lock, until all helper threads are done
       
   215     // with poll().
       
   216     private final FinishLock finishLock = new FinishLock();
       
   217 
       
   218     private final class FinishLock  {
       
   219         // Number of helper threads, that did not finish yet.
       
   220         private int threadsToFinish;
       
   221 
       
   222         // IOException which occurred during the last run.
       
   223         IOException exception = null;
       
   224 
       
   225         // Called before polling.
       
   226         private void reset() {
       
   227             threadsToFinish = threads.size(); // helper threads
       
   228         }
       
   229 
       
   230         // Each helper thread invokes this function on finishLock, when
       
   231         // the thread is done with poll().
       
   232         private synchronized void threadFinished() {
       
   233             if (threadsToFinish == threads.size()) { // finished poll() first
       
   234                 // if finished first, wakeup others
       
   235                 wakeup();
       
   236             }
       
   237             threadsToFinish--;
       
   238             if (threadsToFinish == 0) // all helper threads finished poll().
       
   239                 notify();             // notify the main thread
       
   240         }
       
   241 
       
   242         // The main thread invokes this function on finishLock to wait
       
   243         // for helper threads to finish poll().
       
   244         private synchronized void waitForHelperThreads() {
       
   245             if (threadsToFinish == threads.size()) {
       
   246                 // no helper threads finished yet. Wakeup them up.
       
   247                 wakeup();
       
   248             }
       
   249             while (threadsToFinish != 0) {
       
   250                 try {
       
   251                     finishLock.wait();
       
   252                 } catch (InterruptedException e) {
       
   253                     // Interrupted - set interrupted state.
       
   254                     Thread.currentThread().interrupt();
       
   255                 }
       
   256             }
       
   257         }
       
   258 
       
   259         // sets IOException for this run
       
   260         private synchronized void setException(IOException e) {
       
   261             exception = e;
       
   262         }
       
   263 
       
   264         // Checks if there was any exception during the last run.
       
   265         // If yes, throws it
       
   266         private void checkForException() throws IOException {
       
   267             if (exception == null)
       
   268                 return;
       
   269             StringBuffer message =  new StringBuffer("An exception occurred" +
       
   270                                        " during the execution of select(): \n");
       
   271             message.append(exception);
       
   272             message.append('\n');
       
   273             exception = null;
       
   274             throw new IOException(message.toString());
       
   275         }
       
   276     }
       
   277 
       
   278     private final class SubSelector {
       
   279         private final int pollArrayIndex; // starting index in pollArray to poll
       
   280         // These arrays will hold result of native select().
       
   281         // The first element of each array is the number of selected sockets.
       
   282         // Other elements are file descriptors of selected sockets.
       
   283         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
       
   284         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
       
   285         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
       
   286 
       
   287         private SubSelector() {
       
   288             this.pollArrayIndex = 0; // main thread
       
   289         }
       
   290 
       
   291         private SubSelector(int threadIndex) { // helper threads
       
   292             this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
       
   293         }
       
   294 
       
   295         private int poll() throws IOException{ // poll for the main thread
       
   296             return poll0(pollWrapper.pollArrayAddress,
       
   297                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
       
   298                          readFds, writeFds, exceptFds, timeout);
       
   299         }
       
   300 
       
   301         private int poll(int index) throws IOException {
       
   302             // poll for helper threads
       
   303             return  poll0(pollWrapper.pollArrayAddress +
       
   304                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
       
   305                      Math.min(MAX_SELECTABLE_FDS,
       
   306                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
       
   307                      readFds, writeFds, exceptFds, timeout);
       
   308         }
       
   309 
       
   310         private native int poll0(long pollAddress, int numfds,
       
   311              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
       
   312 
       
   313         private int processSelectedKeys(long updateCount) {
       
   314             int numKeysUpdated = 0;
       
   315             numKeysUpdated += processFDSet(updateCount, readFds,
       
   316                                            Net.POLLIN,
       
   317                                            false);
       
   318             numKeysUpdated += processFDSet(updateCount, writeFds,
       
   319                                            Net.POLLCONN |
       
   320                                            Net.POLLOUT,
       
   321                                            false);
       
   322             numKeysUpdated += processFDSet(updateCount, exceptFds,
       
   323                                            Net.POLLIN |
       
   324                                            Net.POLLCONN |
       
   325                                            Net.POLLOUT,
       
   326                                            true);
       
   327             return numKeysUpdated;
       
   328         }
       
   329 
       
   330         /**
       
   331          * Note, clearedCount is used to determine if the readyOps have
       
   332          * been reset in this select operation. updateCount is used to
       
   333          * tell if a key has been counted as updated in this select
       
   334          * operation.
       
   335          *
       
   336          * me.updateCount <= me.clearedCount <= updateCount
       
   337          */
       
   338         private int processFDSet(long updateCount, int[] fds, int rOps,
       
   339                                  boolean isExceptFds)
       
   340         {
       
   341             int numKeysUpdated = 0;
       
   342             for (int i = 1; i <= fds[0]; i++) {
       
   343                 int desc = fds[i];
       
   344                 if (desc == wakeupSourceFd) {
       
   345                     synchronized (interruptLock) {
       
   346                         interruptTriggered = true;
       
   347                     }
       
   348                     continue;
       
   349                 }
       
   350                 MapEntry me = fdMap.get(desc);
       
   351                 // If me is null, the key was deregistered in the previous
       
   352                 // processDeregisterQueue.
       
   353                 if (me == null)
       
   354                     continue;
       
   355                 SelectionKeyImpl sk = me.ski;
       
   356 
       
   357                 // The descriptor may be in the exceptfds set because there is
       
   358                 // OOB data queued to the socket. If there is OOB data then it
       
   359                 // is discarded and the key is not added to the selected set.
       
   360                 if (isExceptFds &&
       
   361                     (sk.channel() instanceof SocketChannelImpl) &&
       
   362                     discardUrgentData(desc))
       
   363                 {
       
   364                     continue;
       
   365                 }
       
   366 
       
   367                 if (selectedKeys.contains(sk)) { // Key in selected set
       
   368                     if (me.clearedCount != updateCount) {
       
   369                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
       
   370                             (me.updateCount != updateCount)) {
       
   371                             me.updateCount = updateCount;
       
   372                             numKeysUpdated++;
       
   373                         }
       
   374                     } else { // The readyOps have been set; now add
       
   375                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
       
   376                             (me.updateCount != updateCount)) {
       
   377                             me.updateCount = updateCount;
       
   378                             numKeysUpdated++;
       
   379                         }
       
   380                     }
       
   381                     me.clearedCount = updateCount;
       
   382                 } else { // Key is not in selected set yet
       
   383                     if (me.clearedCount != updateCount) {
       
   384                         sk.channel.translateAndSetReadyOps(rOps, sk);
       
   385                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
       
   386                             selectedKeys.add(sk);
       
   387                             me.updateCount = updateCount;
       
   388                             numKeysUpdated++;
       
   389                         }
       
   390                     } else { // The readyOps have been set; now add
       
   391                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
       
   392                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
       
   393                             selectedKeys.add(sk);
       
   394                             me.updateCount = updateCount;
       
   395                             numKeysUpdated++;
       
   396                         }
       
   397                     }
       
   398                     me.clearedCount = updateCount;
       
   399                 }
       
   400             }
       
   401             return numKeysUpdated;
       
   402         }
       
   403     }
       
   404 
       
   405     // Represents a helper thread used for select.
       
   406     private final class SelectThread extends Thread {
       
   407         private final int index; // index of this thread
       
   408         final SubSelector subSelector;
       
   409         private long lastRun = 0; // last run number
       
   410         private volatile boolean zombie;
       
   411         // Creates a new thread
       
   412         private SelectThread(int i) {
       
   413             super(null, null, "SelectorHelper", 0, false);
       
   414             this.index = i;
       
   415             this.subSelector = new SubSelector(i);
       
   416             //make sure we wait for next round of poll
       
   417             this.lastRun = startLock.runsCounter;
       
   418         }
       
   419         void makeZombie() {
       
   420             zombie = true;
       
   421         }
       
   422         boolean isZombie() {
       
   423             return zombie;
       
   424         }
       
   425         public void run() {
       
   426             while (true) { // poll loop
       
   427                 // wait for the start of poll. If this thread has become
       
   428                 // redundant, then exit.
       
   429                 if (startLock.waitForStart(this))
       
   430                     return;
       
   431                 // call poll()
       
   432                 try {
       
   433                     subSelector.poll(index);
       
   434                 } catch (IOException e) {
       
   435                     // Save this exception and let other threads finish.
       
   436                     finishLock.setException(e);
       
   437                 }
       
   438                 // notify main thread, that this thread has finished, and
       
   439                 // wakeup others, if this thread is the first to finish.
       
   440                 finishLock.threadFinished();
       
   441             }
       
   442         }
       
   443     }
       
   444 
       
   445     // After some channels registered/deregistered, the number of required
       
   446     // helper threads may have changed. Adjust this number.
       
   447     private void adjustThreadsCount() {
       
   448         if (threadsCount > threads.size()) {
       
   449             // More threads needed. Start more threads.
       
   450             for (int i = threads.size(); i < threadsCount; i++) {
       
   451                 SelectThread newThread = new SelectThread(i);
       
   452                 threads.add(newThread);
       
   453                 newThread.setDaemon(true);
       
   454                 newThread.start();
       
   455             }
       
   456         } else if (threadsCount < threads.size()) {
       
   457             // Some threads become redundant. Remove them from the threads List.
       
   458             for (int i = threads.size() - 1 ; i >= threadsCount; i--)
       
   459                 threads.remove(i).makeZombie();
       
   460         }
       
   461     }
       
   462 
       
   463     // Sets Windows wakeup socket to a signaled state.
       
   464     private void setWakeupSocket() {
       
   465         setWakeupSocket0(wakeupSinkFd);
       
   466     }
       
   467     private native void setWakeupSocket0(int wakeupSinkFd);
       
   468 
       
   469     // Sets Windows wakeup socket to a non-signaled state.
       
   470     private void resetWakeupSocket() {
       
   471         synchronized (interruptLock) {
       
   472             if (interruptTriggered == false)
       
   473                 return;
       
   474             resetWakeupSocket0(wakeupSourceFd);
       
   475             interruptTriggered = false;
       
   476         }
       
   477     }
       
   478 
       
   479     private native void resetWakeupSocket0(int wakeupSourceFd);
       
   480 
       
   481     private native boolean discardUrgentData(int fd);
       
   482 
       
   483     // We increment this counter on each call to updateSelectedKeys()
       
   484     // each entry in  SubSelector.fdsMap has a memorized value of
       
   485     // updateCount. When we increment numKeysUpdated we set updateCount
       
   486     // for the corresponding entry to its current value. This is used to
       
   487     // avoid counting the same key more than once - the same key can
       
   488     // appear in readfds and writefds.
       
   489     private long updateCount = 0;
       
   490 
       
   491     // Update ops of the corresponding Channels. Add the ready keys to the
       
   492     // ready queue.
       
   493     private int updateSelectedKeys() {
       
   494         updateCount++;
       
   495         int numKeysUpdated = 0;
       
   496         numKeysUpdated += subSelector.processSelectedKeys(updateCount);
       
   497         for (SelectThread t: threads) {
       
   498             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
       
   499         }
       
   500         return numKeysUpdated;
       
   501     }
       
   502 
       
   503     protected void implClose() throws IOException {
       
   504         synchronized (closeLock) {
       
   505             if (channelArray != null) {
       
   506                 if (pollWrapper != null) {
       
   507                     // prevent further wakeup
       
   508                     synchronized (interruptLock) {
       
   509                         interruptTriggered = true;
       
   510                     }
       
   511                     wakeupPipe.sink().close();
       
   512                     wakeupPipe.source().close();
       
   513                     for(int i = 1; i < totalChannels; i++) { // Deregister channels
       
   514                         if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
       
   515                             deregister(channelArray[i]);
       
   516                             SelectableChannel selch = channelArray[i].channel();
       
   517                             if (!selch.isOpen() && !selch.isRegistered())
       
   518                                 ((SelChImpl)selch).kill();
       
   519                         }
       
   520                     }
       
   521                     pollWrapper.free();
       
   522                     pollWrapper = null;
       
   523                     selectedKeys = null;
       
   524                     channelArray = null;
       
   525                     // Make all remaining helper threads exit
       
   526                     for (SelectThread t: threads)
       
   527                          t.makeZombie();
       
   528                     startLock.startThreads();
       
   529                 }
       
   530             }
       
   531         }
       
   532     }
       
   533 
       
   534     protected void implRegister(SelectionKeyImpl ski) {
       
   535         synchronized (closeLock) {
       
   536             if (pollWrapper == null)
       
   537                 throw new ClosedSelectorException();
       
   538             growIfNeeded();
       
   539             channelArray[totalChannels] = ski;
       
   540             ski.setIndex(totalChannels);
       
   541             fdMap.put(ski);
       
   542             keys.add(ski);
       
   543             pollWrapper.addEntry(totalChannels, ski);
       
   544             totalChannels++;
       
   545         }
       
   546     }
       
   547 
       
   548     private void growIfNeeded() {
       
   549         if (channelArray.length == totalChannels) {
       
   550             int newSize = totalChannels * 2; // Make a larger array
       
   551             SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
       
   552             System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
       
   553             channelArray = temp;
       
   554             pollWrapper.grow(newSize);
       
   555         }
       
   556         if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
       
   557             pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
       
   558             totalChannels++;
       
   559             threadsCount++;
       
   560         }
       
   561     }
       
   562 
       
   563     protected void implDereg(SelectionKeyImpl ski) throws IOException{
       
   564         int i = ski.getIndex();
       
   565         assert (i >= 0);
       
   566         synchronized (closeLock) {
       
   567             if (i != totalChannels - 1) {
       
   568                 // Copy end one over it
       
   569                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
       
   570                 channelArray[i] = endChannel;
       
   571                 endChannel.setIndex(i);
       
   572                 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
       
   573                                                                 pollWrapper, i);
       
   574             }
       
   575             ski.setIndex(-1);
       
   576         }
       
   577         channelArray[totalChannels - 1] = null;
       
   578         totalChannels--;
       
   579         if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
       
   580             totalChannels--;
       
   581             threadsCount--; // The last thread has become redundant.
       
   582         }
       
   583         fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
       
   584         keys.remove(ski);
       
   585         selectedKeys.remove(ski);
       
   586         deregister(ski);
       
   587         SelectableChannel selch = ski.channel();
       
   588         if (!selch.isOpen() && !selch.isRegistered())
       
   589             ((SelChImpl)selch).kill();
       
   590     }
       
   591 
       
   592     public void putEventOps(SelectionKeyImpl sk, int ops) {
       
   593         synchronized (closeLock) {
       
   594             if (pollWrapper == null)
       
   595                 throw new ClosedSelectorException();
       
   596             // make sure this sk has not been removed yet
       
   597             int index = sk.getIndex();
       
   598             if (index == -1)
       
   599                 throw new CancelledKeyException();
       
   600             pollWrapper.putEventOps(index, ops);
       
   601         }
       
   602     }
       
   603 
       
   604     public Selector wakeup() {
       
   605         synchronized (interruptLock) {
       
   606             if (!interruptTriggered) {
       
   607                 setWakeupSocket();
       
   608                 interruptTriggered = true;
       
   609             }
       
   610         }
       
   611         return this;
       
   612     }
       
   613 
       
   614     static {
       
   615         IOUtil.load();
       
   616     }
       
   617 }