src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
branchunixdomainchannels
changeset 59004 84e08e00c29c
parent 58801 119ac9128c1b
parent 58999 6bc29ebe053e
equal deleted inserted replaced
59003:aaba7cc40951 59004:84e08e00c29c
    36 import java.util.Deque;
    36 import java.util.Deque;
    37 import java.util.HashMap;
    37 import java.util.HashMap;
    38 import java.util.List;
    38 import java.util.List;
    39 import java.util.Map;
    39 import java.util.Map;
    40 import java.util.function.Consumer;
    40 import java.util.function.Consumer;
       
    41 import jdk.internal.misc.Unsafe;
    41 
    42 
    42 /**
    43 /**
    43  * A multi-threaded implementation of Selector for Windows.
    44  * A multi-threaded implementation of Selector for Windows.
    44  *
    45  *
    45  * @author Konstantin Kladko
    46  * @author Konstantin Kladko
    46  * @author Mark Reinhold
    47  * @author Mark Reinhold
    47  */
    48  */
    48 
    49 
    49 class WindowsSelectorImpl extends SelectorImpl {
    50 class WindowsSelectorImpl extends SelectorImpl {
       
    51     private static final Unsafe unsafe = Unsafe.getUnsafe();
       
    52     private static int addressSize = unsafe.addressSize();
       
    53 
       
    54     private static int dependsArch(int value32, int value64) {
       
    55         return (addressSize == 4) ? value32 : value64;
       
    56     }
       
    57 
    50     // Initial capacity of the poll array
    58     // Initial capacity of the poll array
    51     private final int INIT_CAP = 8;
    59     private final int INIT_CAP = 8;
    52     // Maximum number of sockets for select().
    60     // Maximum number of sockets for select().
    53     // Should be INIT_CAP times a power of 2
    61     // Should be INIT_CAP times a power of 2
    54     private static final int MAX_SELECTABLE_FDS = 1024;
    62     private static final int MAX_SELECTABLE_FDS = 1024;
       
    63 
       
    64     // Size of FD_SET struct to allocate a buffer for it in SubSelector,
       
    65     // aligned to 8 bytes on 64-bit:
       
    66     // struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.
       
    67     private static final long SIZEOF_FD_SET = dependsArch(
       
    68             4 + MAX_SELECTABLE_FDS * 4,      // SOCKET = unsigned int
       
    69             4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64
    55 
    70 
    56     // The list of SelectableChannels serviced by this Selector. Every mod
    71     // The list of SelectableChannels serviced by this Selector. Every mod
    57     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
    72     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
    58     // array,  where the corresponding entry is occupied by the wakeupSocket
    73     // array,  where the corresponding entry is occupied by the wakeupSocket
    59     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
    74     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
   324         // The first element of each array is the number of selected sockets.
   339         // The first element of each array is the number of selected sockets.
   325         // Other elements are file descriptors of selected sockets.
   340         // Other elements are file descriptors of selected sockets.
   326         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
   341         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
   327         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
   342         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
   328         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
   343         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
       
   344         // Buffer for readfds, writefds and exceptfds structs that are passed
       
   345         // to native select().
       
   346         private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 3);
   329 
   347 
   330         private SubSelector() {
   348         private SubSelector() {
   331             this.pollArrayIndex = 0; // main thread
   349             this.pollArrayIndex = 0; // main thread
   332         }
   350         }
   333 
   351 
   336         }
   354         }
   337 
   355 
   338         private int poll() throws IOException{ // poll for the main thread
   356         private int poll() throws IOException{ // poll for the main thread
   339             return poll0(pollWrapper.pollArrayAddress,
   357             return poll0(pollWrapper.pollArrayAddress,
   340                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
   358                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
   341                          readFds, writeFds, exceptFds, timeout);
   359                          readFds, writeFds, exceptFds, timeout, fdsBuffer);
   342         }
   360         }
   343 
   361 
   344         private int poll(int index) throws IOException {
   362         private int poll(int index) throws IOException {
   345             // poll for helper threads
   363             // poll for helper threads
   346             return  poll0(pollWrapper.pollArrayAddress +
   364             return  poll0(pollWrapper.pollArrayAddress +
   347                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
   365                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
   348                      Math.min(MAX_SELECTABLE_FDS,
   366                      Math.min(MAX_SELECTABLE_FDS,
   349                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
   367                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
   350                      readFds, writeFds, exceptFds, timeout);
   368                      readFds, writeFds, exceptFds, timeout, fdsBuffer);
   351         }
   369         }
   352 
   370 
   353         private native int poll0(long pollAddress, int numfds,
   371         private native int poll0(long pollAddress, int numfds,
   354              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
   372              int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);
   355 
   373 
   356         private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
   374         private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
   357             int numKeysUpdated = 0;
   375             int numKeysUpdated = 0;
   358             numKeysUpdated += processFDSet(updateCount, action, readFds,
   376             numKeysUpdated += processFDSet(updateCount, action, readFds,
   359                                            Net.POLLIN,
   377                                            Net.POLLIN,
   413                     numKeysUpdated++;
   431                     numKeysUpdated++;
   414                 }
   432                 }
   415             }
   433             }
   416             return numKeysUpdated;
   434             return numKeysUpdated;
   417         }
   435         }
       
   436 
       
   437         private void freeFDSetBuffer() {
       
   438             unsafe.freeMemory(fdsBuffer);
       
   439         }
   418     }
   440     }
   419 
   441 
   420     // Represents a helper thread used for select.
   442     // Represents a helper thread used for select.
   421     private final class SelectThread extends Thread {
   443     private final class SelectThread extends Thread {
   422         private final int index; // index of this thread
   444         private final int index; // index of this thread
   439         }
   461         }
   440         public void run() {
   462         public void run() {
   441             while (true) { // poll loop
   463             while (true) { // poll loop
   442                 // wait for the start of poll. If this thread has become
   464                 // wait for the start of poll. If this thread has become
   443                 // redundant, then exit.
   465                 // redundant, then exit.
   444                 if (startLock.waitForStart(this))
   466                 if (startLock.waitForStart(this)) {
       
   467                     subSelector.freeFDSetBuffer();
   445                     return;
   468                     return;
       
   469                 }
   446                 // call poll()
   470                 // call poll()
   447                 try {
   471                 try {
   448                     subSelector.poll(index);
   472                     subSelector.poll(index);
   449                 } catch (IOException e) {
   473                 } catch (IOException e) {
   450                     // Save this exception and let other threads finish.
   474                     // Save this exception and let other threads finish.
   531 
   555 
   532         // Make all remaining helper threads exit
   556         // Make all remaining helper threads exit
   533         for (SelectThread t: threads)
   557         for (SelectThread t: threads)
   534              t.makeZombie();
   558              t.makeZombie();
   535         startLock.startThreads();
   559         startLock.startThreads();
       
   560         subSelector.freeFDSetBuffer();
   536     }
   561     }
   537 
   562 
   538     @Override
   563     @Override
   539     protected void implRegister(SelectionKeyImpl ski) {
   564     protected void implRegister(SelectionKeyImpl ski) {
   540         ensureOpen();
   565         ensureOpen();