36 import java.util.Deque; |
36 import java.util.Deque; |
37 import java.util.HashMap; |
37 import java.util.HashMap; |
38 import java.util.List; |
38 import java.util.List; |
39 import java.util.Map; |
39 import java.util.Map; |
40 import java.util.function.Consumer; |
40 import java.util.function.Consumer; |
|
41 import jdk.internal.misc.Unsafe; |
41 |
42 |
42 /** |
43 /** |
43 * A multi-threaded implementation of Selector for Windows. |
44 * A multi-threaded implementation of Selector for Windows. |
44 * |
45 * |
45 * @author Konstantin Kladko |
46 * @author Konstantin Kladko |
46 * @author Mark Reinhold |
47 * @author Mark Reinhold |
47 */ |
48 */ |
48 |
49 |
49 class WindowsSelectorImpl extends SelectorImpl { |
50 class WindowsSelectorImpl extends SelectorImpl { |
|
51 private static final Unsafe unsafe = Unsafe.getUnsafe(); |
|
52 private static int addressSize = unsafe.addressSize(); |
|
53 |
|
54 private static int dependsArch(int value32, int value64) { |
|
55 return (addressSize == 4) ? value32 : value64; |
|
56 } |
|
57 |
50 // Initial capacity of the poll array |
58 // Initial capacity of the poll array |
51 private final int INIT_CAP = 8; |
59 private final int INIT_CAP = 8; |
52 // Maximum number of sockets for select(). |
60 // Maximum number of sockets for select(). |
53 // Should be INIT_CAP times a power of 2 |
61 // Should be INIT_CAP times a power of 2 |
54 private static final int MAX_SELECTABLE_FDS = 1024; |
62 private static final int MAX_SELECTABLE_FDS = 1024; |
|
63 |
|
64 // Size of FD_SET struct to allocate a buffer for it in SubSelector, |
|
65 // aligned to 8 bytes on 64-bit: |
|
66 // struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }. |
|
67 private static final long SIZEOF_FD_SET = dependsArch( |
|
68 4 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int |
|
69 4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64 |
55 |
70 |
56 // The list of SelectableChannels serviced by this Selector. Every mod |
71 // The list of SelectableChannels serviced by this Selector. Every mod |
57 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll |
72 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll |
58 // array, where the corresponding entry is occupied by the wakeupSocket |
73 // array, where the corresponding entry is occupied by the wakeupSocket |
59 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; |
74 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; |
324 // The first element of each array is the number of selected sockets. |
339 // The first element of each array is the number of selected sockets. |
325 // Other elements are file descriptors of selected sockets. |
340 // Other elements are file descriptors of selected sockets. |
326 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; |
341 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; |
327 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; |
342 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; |
328 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; |
343 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; |
|
344 // Buffer for readfds, writefds and exceptfds structs that are passed |
|
345 // to native select(). |
|
346 private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 3); |
329 |
347 |
330 private SubSelector() { |
348 private SubSelector() { |
331 this.pollArrayIndex = 0; // main thread |
349 this.pollArrayIndex = 0; // main thread |
332 } |
350 } |
333 |
351 |
336 } |
354 } |
337 |
355 |
338 private int poll() throws IOException{ // poll for the main thread |
356 private int poll() throws IOException{ // poll for the main thread |
339 return poll0(pollWrapper.pollArrayAddress, |
357 return poll0(pollWrapper.pollArrayAddress, |
340 Math.min(totalChannels, MAX_SELECTABLE_FDS), |
358 Math.min(totalChannels, MAX_SELECTABLE_FDS), |
341 readFds, writeFds, exceptFds, timeout); |
359 readFds, writeFds, exceptFds, timeout, fdsBuffer); |
342 } |
360 } |
343 |
361 |
344 private int poll(int index) throws IOException { |
362 private int poll(int index) throws IOException { |
345 // poll for helper threads |
363 // poll for helper threads |
346 return poll0(pollWrapper.pollArrayAddress + |
364 return poll0(pollWrapper.pollArrayAddress + |
347 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), |
365 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), |
348 Math.min(MAX_SELECTABLE_FDS, |
366 Math.min(MAX_SELECTABLE_FDS, |
349 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), |
367 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), |
350 readFds, writeFds, exceptFds, timeout); |
368 readFds, writeFds, exceptFds, timeout, fdsBuffer); |
351 } |
369 } |
352 |
370 |
353 private native int poll0(long pollAddress, int numfds, |
371 private native int poll0(long pollAddress, int numfds, |
354 int[] readFds, int[] writeFds, int[] exceptFds, long timeout); |
372 int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer); |
355 |
373 |
356 private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) { |
374 private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) { |
357 int numKeysUpdated = 0; |
375 int numKeysUpdated = 0; |
358 numKeysUpdated += processFDSet(updateCount, action, readFds, |
376 numKeysUpdated += processFDSet(updateCount, action, readFds, |
359 Net.POLLIN, |
377 Net.POLLIN, |
439 } |
461 } |
440 public void run() { |
462 public void run() { |
441 while (true) { // poll loop |
463 while (true) { // poll loop |
442 // wait for the start of poll. If this thread has become |
464 // wait for the start of poll. If this thread has become |
443 // redundant, then exit. |
465 // redundant, then exit. |
444 if (startLock.waitForStart(this)) |
466 if (startLock.waitForStart(this)) { |
|
467 subSelector.freeFDSetBuffer(); |
445 return; |
468 return; |
|
469 } |
446 // call poll() |
470 // call poll() |
447 try { |
471 try { |
448 subSelector.poll(index); |
472 subSelector.poll(index); |
449 } catch (IOException e) { |
473 } catch (IOException e) { |
450 // Save this exception and let other threads finish. |
474 // Save this exception and let other threads finish. |