21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
22 * or visit www.oracle.com if you need additional information or have any |
22 * or visit www.oracle.com if you need additional information or have any |
23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 /* |
|
27 */ |
|
28 |
|
29 |
|
30 package sun.nio.ch; |
26 package sun.nio.ch; |
31 |
27 |
32 import java.nio.channels.spi.SelectorProvider; |
28 import java.io.IOException; |
33 import java.nio.channels.Selector; |
|
34 import java.nio.channels.ClosedSelectorException; |
29 import java.nio.channels.ClosedSelectorException; |
35 import java.nio.channels.Pipe; |
30 import java.nio.channels.Pipe; |
36 import java.nio.channels.SelectableChannel; |
31 import java.nio.channels.Selector; |
37 import java.io.IOException; |
32 import java.nio.channels.spi.SelectorProvider; |
38 import java.nio.channels.CancelledKeyException; |
33 import java.util.ArrayDeque; |
|
34 import java.util.ArrayList; |
|
35 import java.util.Deque; |
|
36 import java.util.HashMap; |
39 import java.util.List; |
37 import java.util.List; |
40 import java.util.ArrayList; |
38 import java.util.Map; |
41 import java.util.HashMap; |
|
42 import java.util.Iterator; |
|
43 |
39 |
44 /** |
40 /** |
45 * A multi-threaded implementation of Selector for Windows. |
41 * A multi-threaded implementation of Selector for Windows. |
46 * |
42 * |
47 * @author Konstantin Kladko |
43 * @author Konstantin Kladko |
78 private final Pipe wakeupPipe; |
74 private final Pipe wakeupPipe; |
79 |
75 |
80 // File descriptors corresponding to source and sink |
76 // File descriptors corresponding to source and sink |
81 private final int wakeupSourceFd, wakeupSinkFd; |
77 private final int wakeupSourceFd, wakeupSinkFd; |
82 |
78 |
83 // Lock for close cleanup |
|
84 private final Object closeLock = new Object(); |
|
85 |
|
86 // Maps file descriptors to their indices in pollArray |
79 // Maps file descriptors to their indices in pollArray |
87 private static final class FdMap extends HashMap<Integer, MapEntry> { |
80 private static final class FdMap extends HashMap<Integer, MapEntry> { |
88 static final long serialVersionUID = 0L; |
81 static final long serialVersionUID = 0L; |
89 private MapEntry get(int desc) { |
82 private MapEntry get(int desc) { |
90 return get(Integer.valueOf(desc)); |
83 return get(Integer.valueOf(desc)); |
101 } |
94 } |
102 } |
95 } |
103 |
96 |
104 // class for fdMap entries |
97 // class for fdMap entries |
105 private static final class MapEntry { |
98 private static final class MapEntry { |
106 SelectionKeyImpl ski; |
99 final SelectionKeyImpl ski; |
107 long updateCount = 0; |
100 long updateCount = 0; |
108 long clearedCount = 0; |
101 long clearedCount = 0; |
109 MapEntry(SelectionKeyImpl ski) { |
102 MapEntry(SelectionKeyImpl ski) { |
110 this.ski = ski; |
103 this.ski = ski; |
111 } |
104 } |
118 private long timeout; //timeout for poll |
111 private long timeout; //timeout for poll |
119 |
112 |
120 // Lock for interrupt triggering and clearing |
113 // Lock for interrupt triggering and clearing |
121 private final Object interruptLock = new Object(); |
114 private final Object interruptLock = new Object(); |
122 private volatile boolean interruptTriggered; |
115 private volatile boolean interruptTriggered; |
|
116 |
|
117 // pending new registrations/updates, queued by implRegister and putEventOps |
|
118 private final Object updateLock = new Object(); |
|
119 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
|
120 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
|
121 private final Deque<Integer> updateEvents = new ArrayDeque<>(); |
|
122 |
123 |
123 |
124 WindowsSelectorImpl(SelectorProvider sp) throws IOException { |
124 WindowsSelectorImpl(SelectorProvider sp) throws IOException { |
125 super(sp); |
125 super(sp); |
126 pollWrapper = new PollArrayWrapper(INIT_CAP); |
126 pollWrapper = new PollArrayWrapper(INIT_CAP); |
127 wakeupPipe = Pipe.open(); |
127 wakeupPipe = Pipe.open(); |
133 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); |
133 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); |
134 |
134 |
135 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); |
135 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); |
136 } |
136 } |
137 |
137 |
|
138 private void ensureOpen() { |
|
139 if (!isOpen()) |
|
140 throw new ClosedSelectorException(); |
|
141 } |
|
142 |
138 @Override |
143 @Override |
139 protected int doSelect(long timeout) throws IOException { |
144 protected int doSelect(long timeout) throws IOException { |
140 if (channelArray == null) |
145 assert Thread.holdsLock(this); |
141 throw new ClosedSelectorException(); |
|
142 this.timeout = timeout; // set selector timeout |
146 this.timeout = timeout; // set selector timeout |
|
147 processUpdateQueue(); |
143 processDeregisterQueue(); |
148 processDeregisterQueue(); |
144 if (interruptTriggered) { |
149 if (interruptTriggered) { |
145 resetWakeupSocket(); |
150 resetWakeupSocket(); |
146 return 0; |
151 return 0; |
147 } |
152 } |
172 processDeregisterQueue(); |
177 processDeregisterQueue(); |
173 int updated = updateSelectedKeys(); |
178 int updated = updateSelectedKeys(); |
174 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. |
179 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. |
175 resetWakeupSocket(); |
180 resetWakeupSocket(); |
176 return updated; |
181 return updated; |
|
182 } |
|
183 |
|
184 /** |
|
185 * Process new registrations and changes to the interest ops. |
|
186 */ |
|
187 private void processUpdateQueue() { |
|
188 assert Thread.holdsLock(this); |
|
189 |
|
190 synchronized (updateLock) { |
|
191 SelectionKeyImpl ski; |
|
192 |
|
193 // new registrations |
|
194 while ((ski = newKeys.pollFirst()) != null) { |
|
195 if (ski.isValid()) { |
|
196 growIfNeeded(); |
|
197 channelArray[totalChannels] = ski; |
|
198 ski.setIndex(totalChannels); |
|
199 pollWrapper.putEntry(totalChannels, ski); |
|
200 totalChannels++; |
|
201 MapEntry previous = fdMap.put(ski); |
|
202 assert previous == null; |
|
203 } |
|
204 } |
|
205 |
|
206 // changes to interest ops |
|
207 assert updateKeys.size() == updateEvents.size(); |
|
208 while ((ski = updateKeys.pollFirst()) != null) { |
|
209 int events = updateEvents.pollFirst(); |
|
210 int fd = ski.channel.getFDVal(); |
|
211 if (ski.isValid() && fdMap.containsKey(fd)) { |
|
212 int index = ski.getIndex(); |
|
213 assert index >= 0 && index < totalChannels; |
|
214 pollWrapper.putEventOps(index, events); |
|
215 } |
|
216 } |
|
217 } |
177 } |
218 } |
178 |
219 |
179 // Helper threads wait on this lock for the next poll. |
220 // Helper threads wait on this lock for the next poll. |
180 private final StartLock startLock = new StartLock(); |
221 private final StartLock startLock = new StartLock(); |
181 |
222 |
501 return numKeysUpdated; |
542 return numKeysUpdated; |
502 } |
543 } |
503 |
544 |
504 @Override |
545 @Override |
505 protected void implClose() throws IOException { |
546 protected void implClose() throws IOException { |
506 synchronized (closeLock) { |
547 assert !isOpen(); |
507 if (channelArray != null) { |
548 assert Thread.holdsLock(this); |
508 if (pollWrapper != null) { |
549 |
509 // prevent further wakeup |
550 // prevent further wakeup |
510 synchronized (interruptLock) { |
551 synchronized (interruptLock) { |
511 interruptTriggered = true; |
552 interruptTriggered = true; |
512 } |
553 } |
513 wakeupPipe.sink().close(); |
554 |
514 wakeupPipe.source().close(); |
555 wakeupPipe.sink().close(); |
515 for(int i = 1; i < totalChannels; i++) { // Deregister channels |
556 wakeupPipe.source().close(); |
516 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent |
557 pollWrapper.free(); |
517 deregister(channelArray[i]); |
558 |
518 SelectableChannel selch = channelArray[i].channel(); |
559 // Make all remaining helper threads exit |
519 if (!selch.isOpen() && !selch.isRegistered()) |
560 for (SelectThread t: threads) |
520 ((SelChImpl)selch).kill(); |
561 t.makeZombie(); |
521 } |
562 startLock.startThreads(); |
522 } |
563 } |
523 pollWrapper.free(); |
564 |
524 pollWrapper = null; |
565 @Override |
525 channelArray = null; |
|
526 // Make all remaining helper threads exit |
|
527 for (SelectThread t: threads) |
|
528 t.makeZombie(); |
|
529 startLock.startThreads(); |
|
530 } |
|
531 } |
|
532 } |
|
533 } |
|
534 |
|
535 protected void implRegister(SelectionKeyImpl ski) { |
566 protected void implRegister(SelectionKeyImpl ski) { |
536 synchronized (closeLock) { |
567 ensureOpen(); |
537 if (pollWrapper == null) |
568 synchronized (updateLock) { |
538 throw new ClosedSelectorException(); |
569 newKeys.addLast(ski); |
539 growIfNeeded(); |
|
540 channelArray[totalChannels] = ski; |
|
541 ski.setIndex(totalChannels); |
|
542 fdMap.put(ski); |
|
543 keys.add(ski); |
|
544 pollWrapper.addEntry(totalChannels, ski); |
|
545 totalChannels++; |
|
546 } |
570 } |
547 } |
571 } |
548 |
572 |
549 private void growIfNeeded() { |
573 private void growIfNeeded() { |
550 if (channelArray.length == totalChannels) { |
574 if (channelArray.length == totalChannels) { |
559 totalChannels++; |
583 totalChannels++; |
560 threadsCount++; |
584 threadsCount++; |
561 } |
585 } |
562 } |
586 } |
563 |
587 |
564 protected void implDereg(SelectionKeyImpl ski) throws IOException{ |
588 @Override |
565 int i = ski.getIndex(); |
589 protected void implDereg(SelectionKeyImpl ski) { |
566 assert (i >= 0); |
590 assert !ski.isValid(); |
567 synchronized (closeLock) { |
591 assert Thread.holdsLock(this); |
|
592 |
|
593 if (fdMap.remove(ski) != null) { |
|
594 int i = ski.getIndex(); |
|
595 assert (i >= 0); |
|
596 |
568 if (i != totalChannels - 1) { |
597 if (i != totalChannels - 1) { |
569 // Copy end one over it |
598 // Copy end one over it |
570 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; |
599 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; |
571 channelArray[i] = endChannel; |
600 channelArray[i] = endChannel; |
572 endChannel.setIndex(i); |
601 endChannel.setIndex(i); |
573 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, |
602 pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i); |
574 pollWrapper, i); |
|
575 } |
603 } |
576 ski.setIndex(-1); |
604 ski.setIndex(-1); |
577 } |
605 |
578 channelArray[totalChannels - 1] = null; |
606 channelArray[totalChannels - 1] = null; |
579 totalChannels--; |
|
580 if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { |
|
581 totalChannels--; |
607 totalChannels--; |
582 threadsCount--; // The last thread has become redundant. |
608 if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { |
583 } |
609 totalChannels--; |
584 fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys |
610 threadsCount--; // The last thread has become redundant. |
585 keys.remove(ski); |
611 } |
586 selectedKeys.remove(ski); |
612 } |
587 deregister(ski); |
613 } |
588 SelectableChannel selch = ski.channel(); |
614 |
589 if (!selch.isOpen() && !selch.isRegistered()) |
615 @Override |
590 ((SelChImpl)selch).kill(); |
616 public void putEventOps(SelectionKeyImpl ski, int events) { |
591 } |
617 ensureOpen(); |
592 |
618 synchronized (updateLock) { |
593 public void putEventOps(SelectionKeyImpl sk, int ops) { |
619 updateEvents.addLast(events); // events first in case adding key fails |
594 synchronized (closeLock) { |
620 updateKeys.addLast(ski); |
595 if (pollWrapper == null) |
621 } |
596 throw new ClosedSelectorException(); |
622 } |
597 // make sure this sk has not been removed yet |
623 |
598 int index = sk.getIndex(); |
624 @Override |
599 if (index == -1) |
|
600 throw new CancelledKeyException(); |
|
601 pollWrapper.putEventOps(index, ops); |
|
602 } |
|
603 } |
|
604 |
|
605 public Selector wakeup() { |
625 public Selector wakeup() { |
606 synchronized (interruptLock) { |
626 synchronized (interruptLock) { |
607 if (!interruptTriggered) { |
627 if (!interruptTriggered) { |
608 setWakeupSocket(); |
628 setWakeupSocket(); |
609 interruptTriggered = true; |
629 interruptTriggered = true; |