src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
changeset 47216 71c04702a3d5
parent 37521 b6e0f285c998
child 49248 15a0e60c8b97
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java	Tue Sep 12 19:03:39 2017 +0200
@@ -0,0 +1,617 @@
+/*
+ * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ */
+
+
+package sun.nio.ch;
+
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.Selector;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.Pipe;
+import java.nio.channels.SelectableChannel;
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * A multi-threaded implementation of Selector for Windows.
+ *
+ * @author Konstantin Kladko
+ * @author Mark Reinhold
+ */
+
+final class WindowsSelectorImpl extends SelectorImpl {
+    // Initial capacity of the poll array
+    private final int INIT_CAP = 8;
+    // Maximum number of sockets for select().
+    // Should be INIT_CAP times a power of 2
+    private static final int MAX_SELECTABLE_FDS = 1024;
+
+    // The list of SelectableChannels serviced by this Selector. Every mod
+    // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
+    // array,  where the corresponding entry is occupied by the wakeupSocket
+    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
+
+    // The global native poll array holds file decriptors and event masks
+    private PollArrayWrapper pollWrapper;
+
+    // The number of valid entries in  poll array, including entries occupied
+    // by wakeup socket handle.
+    private int totalChannels = 1;
+
+    // Number of helper threads needed for select. We need one thread per
+    // each additional set of MAX_SELECTABLE_FDS - 1 channels.
+    private int threadsCount = 0;
+
+    // A list of helper threads for select.
+    private final List<SelectThread> threads = new ArrayList<SelectThread>();
+
+    //Pipe used as a wakeup object.
+    private final Pipe wakeupPipe;
+
+    // File descriptors corresponding to source and sink
+    private final int wakeupSourceFd, wakeupSinkFd;
+
+    // Lock for close cleanup
+    private Object closeLock = new Object();
+
+    // Maps file descriptors to their indices in  pollArray
+    private static final class FdMap extends HashMap<Integer, MapEntry> {
+        static final long serialVersionUID = 0L;
+        private MapEntry get(int desc) {
+            return get(Integer.valueOf(desc));
+        }
+        private MapEntry put(SelectionKeyImpl ski) {
+            return put(Integer.valueOf(ski.channel.getFDVal()), new MapEntry(ski));
+        }
+        private MapEntry remove(SelectionKeyImpl ski) {
+            Integer fd = Integer.valueOf(ski.channel.getFDVal());
+            MapEntry x = get(fd);
+            if ((x != null) && (x.ski.channel == ski.channel))
+                return remove(fd);
+            return null;
+        }
+    }
+
+    // class for fdMap entries
+    private static final class MapEntry {
+        SelectionKeyImpl ski;
+        long updateCount = 0;
+        long clearedCount = 0;
+        MapEntry(SelectionKeyImpl ski) {
+            this.ski = ski;
+        }
+    }
+    private final FdMap fdMap = new FdMap();
+
+    // SubSelector for the main thread
+    private final SubSelector subSelector = new SubSelector();
+
+    private long timeout; //timeout for poll
+
+    // Lock for interrupt triggering and clearing
+    private final Object interruptLock = new Object();
+    private volatile boolean interruptTriggered;
+
+    WindowsSelectorImpl(SelectorProvider sp) throws IOException {
+        super(sp);
+        pollWrapper = new PollArrayWrapper(INIT_CAP);
+        wakeupPipe = Pipe.open();
+        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
+
+        // Disable the Nagle algorithm so that the wakeup is more immediate
+        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
+        (sink.sc).socket().setTcpNoDelay(true);
+        wakeupSinkFd = ((SelChImpl)sink).getFDVal();
+
+        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
+    }
+
+    protected int doSelect(long timeout) throws IOException {
+        if (channelArray == null)
+            throw new ClosedSelectorException();
+        this.timeout = timeout; // set selector timeout
+        processDeregisterQueue();
+        if (interruptTriggered) {
+            resetWakeupSocket();
+            return 0;
+        }
+        // Calculate number of helper threads needed for poll. If necessary
+        // threads are created here and start waiting on startLock
+        adjustThreadsCount();
+        finishLock.reset(); // reset finishLock
+        // Wakeup helper threads, waiting on startLock, so they start polling.
+        // Redundant threads will exit here after wakeup.
+        startLock.startThreads();
+        // do polling in the main thread. Main thread is responsible for
+        // first MAX_SELECTABLE_FDS entries in pollArray.
+        try {
+            begin();
+            try {
+                subSelector.poll();
+            } catch (IOException e) {
+                finishLock.setException(e); // Save this exception
+            }
+            // Main thread is out of poll(). Wakeup others and wait for them
+            if (threads.size() > 0)
+                finishLock.waitForHelperThreads();
+          } finally {
+              end();
+          }
+        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
+        finishLock.checkForException();
+        processDeregisterQueue();
+        int updated = updateSelectedKeys();
+        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
+        resetWakeupSocket();
+        return updated;
+    }
+
+    // Helper threads wait on this lock for the next poll.
+    private final StartLock startLock = new StartLock();
+
+    private final class StartLock {
+        // A variable which distinguishes the current run of doSelect from the
+        // previous one. Incrementing runsCounter and notifying threads will
+        // trigger another round of poll.
+        private long runsCounter;
+       // Triggers threads, waiting on this lock to start polling.
+        private synchronized void startThreads() {
+            runsCounter++; // next run
+            notifyAll(); // wake up threads.
+        }
+        // This function is called by a helper thread to wait for the
+        // next round of poll(). It also checks, if this thread became
+        // redundant. If yes, it returns true, notifying the thread
+        // that it should exit.
+        private synchronized boolean waitForStart(SelectThread thread) {
+            while (true) {
+                while (runsCounter == thread.lastRun) {
+                    try {
+                        startLock.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                if (thread.isZombie()) { // redundant thread
+                    return true; // will cause run() to exit.
+                } else {
+                    thread.lastRun = runsCounter; // update lastRun
+                    return false; //   will cause run() to poll.
+                }
+            }
+        }
+    }
+
+    // Main thread waits on this lock, until all helper threads are done
+    // with poll().
+    private final FinishLock finishLock = new FinishLock();
+
+    private final class FinishLock  {
+        // Number of helper threads, that did not finish yet.
+        private int threadsToFinish;
+
+        // IOException which occurred during the last run.
+        IOException exception = null;
+
+        // Called before polling.
+        private void reset() {
+            threadsToFinish = threads.size(); // helper threads
+        }
+
+        // Each helper thread invokes this function on finishLock, when
+        // the thread is done with poll().
+        private synchronized void threadFinished() {
+            if (threadsToFinish == threads.size()) { // finished poll() first
+                // if finished first, wakeup others
+                wakeup();
+            }
+            threadsToFinish--;
+            if (threadsToFinish == 0) // all helper threads finished poll().
+                notify();             // notify the main thread
+        }
+
+        // The main thread invokes this function on finishLock to wait
+        // for helper threads to finish poll().
+        private synchronized void waitForHelperThreads() {
+            if (threadsToFinish == threads.size()) {
+                // no helper threads finished yet. Wakeup them up.
+                wakeup();
+            }
+            while (threadsToFinish != 0) {
+                try {
+                    finishLock.wait();
+                } catch (InterruptedException e) {
+                    // Interrupted - set interrupted state.
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        // sets IOException for this run
+        private synchronized void setException(IOException e) {
+            exception = e;
+        }
+
+        // Checks if there was any exception during the last run.
+        // If yes, throws it
+        private void checkForException() throws IOException {
+            if (exception == null)
+                return;
+            StringBuffer message =  new StringBuffer("An exception occurred" +
+                                       " during the execution of select(): \n");
+            message.append(exception);
+            message.append('\n');
+            exception = null;
+            throw new IOException(message.toString());
+        }
+    }
+
+    private final class SubSelector {
+        private final int pollArrayIndex; // starting index in pollArray to poll
+        // These arrays will hold result of native select().
+        // The first element of each array is the number of selected sockets.
+        // Other elements are file descriptors of selected sockets.
+        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
+        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
+        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
+
+        private SubSelector() {
+            this.pollArrayIndex = 0; // main thread
+        }
+
+        private SubSelector(int threadIndex) { // helper threads
+            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
+        }
+
+        private int poll() throws IOException{ // poll for the main thread
+            return poll0(pollWrapper.pollArrayAddress,
+                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
+                         readFds, writeFds, exceptFds, timeout);
+        }
+
+        private int poll(int index) throws IOException {
+            // poll for helper threads
+            return  poll0(pollWrapper.pollArrayAddress +
+                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
+                     Math.min(MAX_SELECTABLE_FDS,
+                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
+                     readFds, writeFds, exceptFds, timeout);
+        }
+
+        private native int poll0(long pollAddress, int numfds,
+             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
+
+        private int processSelectedKeys(long updateCount) {
+            int numKeysUpdated = 0;
+            numKeysUpdated += processFDSet(updateCount, readFds,
+                                           Net.POLLIN,
+                                           false);
+            numKeysUpdated += processFDSet(updateCount, writeFds,
+                                           Net.POLLCONN |
+                                           Net.POLLOUT,
+                                           false);
+            numKeysUpdated += processFDSet(updateCount, exceptFds,
+                                           Net.POLLIN |
+                                           Net.POLLCONN |
+                                           Net.POLLOUT,
+                                           true);
+            return numKeysUpdated;
+        }
+
+        /**
+         * Note, clearedCount is used to determine if the readyOps have
+         * been reset in this select operation. updateCount is used to
+         * tell if a key has been counted as updated in this select
+         * operation.
+         *
+         * me.updateCount <= me.clearedCount <= updateCount
+         */
+        private int processFDSet(long updateCount, int[] fds, int rOps,
+                                 boolean isExceptFds)
+        {
+            int numKeysUpdated = 0;
+            for (int i = 1; i <= fds[0]; i++) {
+                int desc = fds[i];
+                if (desc == wakeupSourceFd) {
+                    synchronized (interruptLock) {
+                        interruptTriggered = true;
+                    }
+                    continue;
+                }
+                MapEntry me = fdMap.get(desc);
+                // If me is null, the key was deregistered in the previous
+                // processDeregisterQueue.
+                if (me == null)
+                    continue;
+                SelectionKeyImpl sk = me.ski;
+
+                // The descriptor may be in the exceptfds set because there is
+                // OOB data queued to the socket. If there is OOB data then it
+                // is discarded and the key is not added to the selected set.
+                if (isExceptFds &&
+                    (sk.channel() instanceof SocketChannelImpl) &&
+                    discardUrgentData(desc))
+                {
+                    continue;
+                }
+
+                if (selectedKeys.contains(sk)) { // Key in selected set
+                    if (me.clearedCount != updateCount) {
+                        if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
+                            (me.updateCount != updateCount)) {
+                            me.updateCount = updateCount;
+                            numKeysUpdated++;
+                        }
+                    } else { // The readyOps have been set; now add
+                        if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
+                            (me.updateCount != updateCount)) {
+                            me.updateCount = updateCount;
+                            numKeysUpdated++;
+                        }
+                    }
+                    me.clearedCount = updateCount;
+                } else { // Key is not in selected set yet
+                    if (me.clearedCount != updateCount) {
+                        sk.channel.translateAndSetReadyOps(rOps, sk);
+                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
+                            selectedKeys.add(sk);
+                            me.updateCount = updateCount;
+                            numKeysUpdated++;
+                        }
+                    } else { // The readyOps have been set; now add
+                        sk.channel.translateAndUpdateReadyOps(rOps, sk);
+                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
+                            selectedKeys.add(sk);
+                            me.updateCount = updateCount;
+                            numKeysUpdated++;
+                        }
+                    }
+                    me.clearedCount = updateCount;
+                }
+            }
+            return numKeysUpdated;
+        }
+    }
+
+    // Represents a helper thread used for select.
+    private final class SelectThread extends Thread {
+        private final int index; // index of this thread
+        final SubSelector subSelector;
+        private long lastRun = 0; // last run number
+        private volatile boolean zombie;
+        // Creates a new thread
+        private SelectThread(int i) {
+            super(null, null, "SelectorHelper", 0, false);
+            this.index = i;
+            this.subSelector = new SubSelector(i);
+            //make sure we wait for next round of poll
+            this.lastRun = startLock.runsCounter;
+        }
+        void makeZombie() {
+            zombie = true;
+        }
+        boolean isZombie() {
+            return zombie;
+        }
+        public void run() {
+            while (true) { // poll loop
+                // wait for the start of poll. If this thread has become
+                // redundant, then exit.
+                if (startLock.waitForStart(this))
+                    return;
+                // call poll()
+                try {
+                    subSelector.poll(index);
+                } catch (IOException e) {
+                    // Save this exception and let other threads finish.
+                    finishLock.setException(e);
+                }
+                // notify main thread, that this thread has finished, and
+                // wakeup others, if this thread is the first to finish.
+                finishLock.threadFinished();
+            }
+        }
+    }
+
+    // After some channels registered/deregistered, the number of required
+    // helper threads may have changed. Adjust this number.
+    private void adjustThreadsCount() {
+        if (threadsCount > threads.size()) {
+            // More threads needed. Start more threads.
+            for (int i = threads.size(); i < threadsCount; i++) {
+                SelectThread newThread = new SelectThread(i);
+                threads.add(newThread);
+                newThread.setDaemon(true);
+                newThread.start();
+            }
+        } else if (threadsCount < threads.size()) {
+            // Some threads become redundant. Remove them from the threads List.
+            for (int i = threads.size() - 1 ; i >= threadsCount; i--)
+                threads.remove(i).makeZombie();
+        }
+    }
+
+    // Sets Windows wakeup socket to a signaled state.
+    private void setWakeupSocket() {
+        setWakeupSocket0(wakeupSinkFd);
+    }
+    private native void setWakeupSocket0(int wakeupSinkFd);
+
+    // Sets Windows wakeup socket to a non-signaled state.
+    private void resetWakeupSocket() {
+        synchronized (interruptLock) {
+            if (interruptTriggered == false)
+                return;
+            resetWakeupSocket0(wakeupSourceFd);
+            interruptTriggered = false;
+        }
+    }
+
+    private native void resetWakeupSocket0(int wakeupSourceFd);
+
+    private native boolean discardUrgentData(int fd);
+
+    // We increment this counter on each call to updateSelectedKeys()
+    // each entry in  SubSelector.fdsMap has a memorized value of
+    // updateCount. When we increment numKeysUpdated we set updateCount
+    // for the corresponding entry to its current value. This is used to
+    // avoid counting the same key more than once - the same key can
+    // appear in readfds and writefds.
+    private long updateCount = 0;
+
+    // Update ops of the corresponding Channels. Add the ready keys to the
+    // ready queue.
+    private int updateSelectedKeys() {
+        updateCount++;
+        int numKeysUpdated = 0;
+        numKeysUpdated += subSelector.processSelectedKeys(updateCount);
+        for (SelectThread t: threads) {
+            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
+        }
+        return numKeysUpdated;
+    }
+
+    protected void implClose() throws IOException {
+        synchronized (closeLock) {
+            if (channelArray != null) {
+                if (pollWrapper != null) {
+                    // prevent further wakeup
+                    synchronized (interruptLock) {
+                        interruptTriggered = true;
+                    }
+                    wakeupPipe.sink().close();
+                    wakeupPipe.source().close();
+                    for(int i = 1; i < totalChannels; i++) { // Deregister channels
+                        if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
+                            deregister(channelArray[i]);
+                            SelectableChannel selch = channelArray[i].channel();
+                            if (!selch.isOpen() && !selch.isRegistered())
+                                ((SelChImpl)selch).kill();
+                        }
+                    }
+                    pollWrapper.free();
+                    pollWrapper = null;
+                    selectedKeys = null;
+                    channelArray = null;
+                    // Make all remaining helper threads exit
+                    for (SelectThread t: threads)
+                         t.makeZombie();
+                    startLock.startThreads();
+                }
+            }
+        }
+    }
+
+    protected void implRegister(SelectionKeyImpl ski) {
+        synchronized (closeLock) {
+            if (pollWrapper == null)
+                throw new ClosedSelectorException();
+            growIfNeeded();
+            channelArray[totalChannels] = ski;
+            ski.setIndex(totalChannels);
+            fdMap.put(ski);
+            keys.add(ski);
+            pollWrapper.addEntry(totalChannels, ski);
+            totalChannels++;
+        }
+    }
+
+    private void growIfNeeded() {
+        if (channelArray.length == totalChannels) {
+            int newSize = totalChannels * 2; // Make a larger array
+            SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
+            System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
+            channelArray = temp;
+            pollWrapper.grow(newSize);
+        }
+        if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
+            pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
+            totalChannels++;
+            threadsCount++;
+        }
+    }
+
+    protected void implDereg(SelectionKeyImpl ski) throws IOException{
+        int i = ski.getIndex();
+        assert (i >= 0);
+        synchronized (closeLock) {
+            if (i != totalChannels - 1) {
+                // Copy end one over it
+                SelectionKeyImpl endChannel = channelArray[totalChannels-1];
+                channelArray[i] = endChannel;
+                endChannel.setIndex(i);
+                pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
+                                                                pollWrapper, i);
+            }
+            ski.setIndex(-1);
+        }
+        channelArray[totalChannels - 1] = null;
+        totalChannels--;
+        if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
+            totalChannels--;
+            threadsCount--; // The last thread has become redundant.
+        }
+        fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
+        keys.remove(ski);
+        selectedKeys.remove(ski);
+        deregister(ski);
+        SelectableChannel selch = ski.channel();
+        if (!selch.isOpen() && !selch.isRegistered())
+            ((SelChImpl)selch).kill();
+    }
+
+    public void putEventOps(SelectionKeyImpl sk, int ops) {
+        synchronized (closeLock) {
+            if (pollWrapper == null)
+                throw new ClosedSelectorException();
+            // make sure this sk has not been removed yet
+            int index = sk.getIndex();
+            if (index == -1)
+                throw new CancelledKeyException();
+            pollWrapper.putEventOps(index, ops);
+        }
+    }
+
+    public Selector wakeup() {
+        synchronized (interruptLock) {
+            if (!interruptTriggered) {
+                setWakeupSocket();
+                interruptTriggered = true;
+            }
+        }
+        return this;
+    }
+
+    static {
+        IOUtil.load();
+    }
+}