jdk/src/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
author chegar
Sat, 04 Feb 2012 07:29:11 +0000
changeset 11823 ee83ae88512d
parent 7668 d4a77089c587
child 14342 8435a30053c1
permissions -rw-r--r--
7041778: Move SCTP implementation out of sun.nio.ch and into its own package Reviewed-by: alanb
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     1
/*
7668
d4a77089c587 6962318: Update copyright year
ohair
parents: 5983
diff changeset
     2
 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     4
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
90ce3da70b43 Initial load
duke
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2445
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2445
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    10
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
90ce3da70b43 Initial load
duke
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
90ce3da70b43 Initial load
duke
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    15
 * accompanied this code).
90ce3da70b43 Initial load
duke
parents:
diff changeset
    16
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
90ce3da70b43 Initial load
duke
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    20
 *
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2445
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2445
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2445
diff changeset
    23
 * questions.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    24
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    25
90ce3da70b43 Initial load
duke
parents:
diff changeset
    26
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
    27
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    28
90ce3da70b43 Initial load
duke
parents:
diff changeset
    29
90ce3da70b43 Initial load
duke
parents:
diff changeset
    30
package sun.nio.ch;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    31
90ce3da70b43 Initial load
duke
parents:
diff changeset
    32
import java.nio.channels.spi.SelectorProvider;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    33
import java.nio.channels.Selector;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    34
import java.nio.channels.ClosedSelectorException;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    35
import java.nio.channels.Pipe;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    36
import java.nio.channels.SelectableChannel;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    37
import java.io.IOException;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    38
import java.util.List;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    39
import java.util.ArrayList;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    40
import java.util.HashMap;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    41
import java.util.Iterator;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    42
90ce3da70b43 Initial load
duke
parents:
diff changeset
    43
/**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    44
 * A multi-threaded implementation of Selector for Windows.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    45
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    46
 * @author Konstantin Kladko
90ce3da70b43 Initial load
duke
parents:
diff changeset
    47
 * @author Mark Reinhold
90ce3da70b43 Initial load
duke
parents:
diff changeset
    48
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    49
90ce3da70b43 Initial load
duke
parents:
diff changeset
    50
final class WindowsSelectorImpl extends SelectorImpl {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    51
    // Initial capacity of the poll array
90ce3da70b43 Initial load
duke
parents:
diff changeset
    52
    private final int INIT_CAP = 8;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    53
    // Maximum number of sockets for select().
90ce3da70b43 Initial load
duke
parents:
diff changeset
    54
    // Should be INIT_CAP times a power of 2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    55
    private final static int MAX_SELECTABLE_FDS = 1024;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    56
90ce3da70b43 Initial load
duke
parents:
diff changeset
    57
    // The list of SelectableChannels serviced by this Selector. Every mod
90ce3da70b43 Initial load
duke
parents:
diff changeset
    58
    // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
90ce3da70b43 Initial load
duke
parents:
diff changeset
    59
    // array,  where the corresponding entry is occupied by the wakeupSocket
90ce3da70b43 Initial load
duke
parents:
diff changeset
    60
    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
90ce3da70b43 Initial load
duke
parents:
diff changeset
    61
90ce3da70b43 Initial load
duke
parents:
diff changeset
    62
    // The global native poll array holds file decriptors and event masks
90ce3da70b43 Initial load
duke
parents:
diff changeset
    63
    private PollArrayWrapper pollWrapper;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    64
90ce3da70b43 Initial load
duke
parents:
diff changeset
    65
    // The number of valid entries in  poll array, including entries occupied
90ce3da70b43 Initial load
duke
parents:
diff changeset
    66
    // by wakeup socket handle.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    67
    private int totalChannels = 1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    68
90ce3da70b43 Initial load
duke
parents:
diff changeset
    69
    // Number of helper threads needed for select. We need one thread per
90ce3da70b43 Initial load
duke
parents:
diff changeset
    70
    // each additional set of MAX_SELECTABLE_FDS - 1 channels.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    71
    private int threadsCount = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    72
90ce3da70b43 Initial load
duke
parents:
diff changeset
    73
    // A list of helper threads for select.
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
    74
    private final List<SelectThread> threads = new ArrayList<SelectThread>();
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    75
90ce3da70b43 Initial load
duke
parents:
diff changeset
    76
    //Pipe used as a wakeup object.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    77
    private final Pipe wakeupPipe;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    78
90ce3da70b43 Initial load
duke
parents:
diff changeset
    79
    // File descriptors corresponding to source and sink
90ce3da70b43 Initial load
duke
parents:
diff changeset
    80
    private final int wakeupSourceFd, wakeupSinkFd;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    81
1449
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
    82
    // Lock for close cleanup
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
    83
    private Object closeLock = new Object();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
    84
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    85
    // Maps file descriptors to their indices in  pollArray
90ce3da70b43 Initial load
duke
parents:
diff changeset
    86
    private final static class FdMap extends HashMap<Integer, MapEntry> {
895
67f1dc69ad10 6726309: Compiler warnings in nio code
alanb
parents: 2
diff changeset
    87
        static final long serialVersionUID = 0L;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    88
        private MapEntry get(int desc) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    89
            return get(new Integer(desc));
90ce3da70b43 Initial load
duke
parents:
diff changeset
    90
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    91
        private MapEntry put(SelectionKeyImpl ski) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    92
            return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
90ce3da70b43 Initial load
duke
parents:
diff changeset
    93
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    94
        private MapEntry remove(SelectionKeyImpl ski) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    95
            Integer fd = new Integer(ski.channel.getFDVal());
90ce3da70b43 Initial load
duke
parents:
diff changeset
    96
            MapEntry x = get(fd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    97
            if ((x != null) && (x.ski.channel == ski.channel))
90ce3da70b43 Initial load
duke
parents:
diff changeset
    98
                return remove(fd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    99
            return null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   100
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   101
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   102
90ce3da70b43 Initial load
duke
parents:
diff changeset
   103
    // class for fdMap entries
90ce3da70b43 Initial load
duke
parents:
diff changeset
   104
    private final static class MapEntry {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   105
        SelectionKeyImpl ski;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   106
        long updateCount = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   107
        long clearedCount = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   108
        MapEntry(SelectionKeyImpl ski) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   109
            this.ski = ski;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   110
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   111
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   112
    private final FdMap fdMap = new FdMap();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   113
90ce3da70b43 Initial load
duke
parents:
diff changeset
   114
    // SubSelector for the main thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   115
    private final SubSelector subSelector = new SubSelector();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   116
90ce3da70b43 Initial load
duke
parents:
diff changeset
   117
    private long timeout; //timeout for poll
90ce3da70b43 Initial load
duke
parents:
diff changeset
   118
90ce3da70b43 Initial load
duke
parents:
diff changeset
   119
    // Lock for interrupt triggering and clearing
90ce3da70b43 Initial load
duke
parents:
diff changeset
   120
    private final Object interruptLock = new Object();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   121
    private volatile boolean interruptTriggered = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   122
90ce3da70b43 Initial load
duke
parents:
diff changeset
   123
    WindowsSelectorImpl(SelectorProvider sp) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   124
        super(sp);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   125
        pollWrapper = new PollArrayWrapper(INIT_CAP);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   126
        wakeupPipe = Pipe.open();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   127
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   128
90ce3da70b43 Initial load
duke
parents:
diff changeset
   129
        // Disable the Nagle algorithm so that the wakeup is more immediate
90ce3da70b43 Initial load
duke
parents:
diff changeset
   130
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   131
        (sink.sc).socket().setTcpNoDelay(true);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   132
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   133
90ce3da70b43 Initial load
duke
parents:
diff changeset
   134
        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   135
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   136
90ce3da70b43 Initial load
duke
parents:
diff changeset
   137
    protected int doSelect(long timeout) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   138
        if (channelArray == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   139
            throw new ClosedSelectorException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   140
        this.timeout = timeout; // set selector timeout
90ce3da70b43 Initial load
duke
parents:
diff changeset
   141
        processDeregisterQueue();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   142
        if (interruptTriggered) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   143
            resetWakeupSocket();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   144
            return 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   145
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   146
        // Calculate number of helper threads needed for poll. If necessary
90ce3da70b43 Initial load
duke
parents:
diff changeset
   147
        // threads are created here and start waiting on startLock
90ce3da70b43 Initial load
duke
parents:
diff changeset
   148
        adjustThreadsCount();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   149
        finishLock.reset(); // reset finishLock
90ce3da70b43 Initial load
duke
parents:
diff changeset
   150
        // Wakeup helper threads, waiting on startLock, so they start polling.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   151
        // Redundant threads will exit here after wakeup.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   152
        startLock.startThreads();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   153
        // do polling in the main thread. Main thread is responsible for
90ce3da70b43 Initial load
duke
parents:
diff changeset
   154
        // first MAX_SELECTABLE_FDS entries in pollArray.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   155
        try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   156
            begin();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   157
            try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   158
                subSelector.poll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   159
            } catch (IOException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   160
                finishLock.setException(e); // Save this exception
90ce3da70b43 Initial load
duke
parents:
diff changeset
   161
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   162
            // Main thread is out of poll(). Wakeup others and wait for them
90ce3da70b43 Initial load
duke
parents:
diff changeset
   163
            if (threads.size() > 0)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   164
                finishLock.waitForHelperThreads();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   165
          } finally {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   166
              end();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   167
          }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   168
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   169
        finishLock.checkForException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   170
        processDeregisterQueue();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   171
        int updated = updateSelectedKeys();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   172
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   173
        resetWakeupSocket();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   174
        return updated;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   175
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   176
90ce3da70b43 Initial load
duke
parents:
diff changeset
   177
    // Helper threads wait on this lock for the next poll.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   178
    private final StartLock startLock = new StartLock();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   179
90ce3da70b43 Initial load
duke
parents:
diff changeset
   180
    private final class StartLock {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   181
        // A variable which distinguishes the current run of doSelect from the
90ce3da70b43 Initial load
duke
parents:
diff changeset
   182
        // previous one. Incrementing runsCounter and notifying threads will
90ce3da70b43 Initial load
duke
parents:
diff changeset
   183
        // trigger another round of poll.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   184
        private long runsCounter;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   185
       // Triggers threads, waiting on this lock to start polling.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   186
        private synchronized void startThreads() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   187
            runsCounter++; // next run
90ce3da70b43 Initial load
duke
parents:
diff changeset
   188
            notifyAll(); // wake up threads.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   189
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   190
        // This function is called by a helper thread to wait for the
90ce3da70b43 Initial load
duke
parents:
diff changeset
   191
        // next round of poll(). It also checks, if this thread became
90ce3da70b43 Initial load
duke
parents:
diff changeset
   192
        // redundant. If yes, it returns true, notifying the thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   193
        // that it should exit.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   194
        private synchronized boolean waitForStart(SelectThread thread) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   195
            while (true) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   196
                while (runsCounter == thread.lastRun) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   197
                    try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   198
                        startLock.wait();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   199
                    } catch (InterruptedException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   200
                        Thread.currentThread().interrupt();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   201
                    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   202
                }
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   203
                if (thread.isZombie()) { // redundant thread
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   204
                    return true; // will cause run() to exit.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   205
                } else {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   206
                    thread.lastRun = runsCounter; // update lastRun
90ce3da70b43 Initial load
duke
parents:
diff changeset
   207
                    return false; //   will cause run() to poll.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   208
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   209
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   210
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   211
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   212
90ce3da70b43 Initial load
duke
parents:
diff changeset
   213
    // Main thread waits on this lock, until all helper threads are done
90ce3da70b43 Initial load
duke
parents:
diff changeset
   214
    // with poll().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   215
    private final FinishLock finishLock = new FinishLock();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   216
90ce3da70b43 Initial load
duke
parents:
diff changeset
   217
    private final class FinishLock  {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   218
        // Number of helper threads, that did not finish yet.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   219
        private int threadsToFinish;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   220
90ce3da70b43 Initial load
duke
parents:
diff changeset
   221
        // IOException which occured during the last run.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   222
        IOException exception = null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   223
90ce3da70b43 Initial load
duke
parents:
diff changeset
   224
        // Called before polling.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   225
        private void reset() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   226
            threadsToFinish = threads.size(); // helper threads
90ce3da70b43 Initial load
duke
parents:
diff changeset
   227
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   228
90ce3da70b43 Initial load
duke
parents:
diff changeset
   229
        // Each helper thread invokes this function on finishLock, when
90ce3da70b43 Initial load
duke
parents:
diff changeset
   230
        // the thread is done with poll().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   231
        private synchronized void threadFinished() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   232
            if (threadsToFinish == threads.size()) { // finished poll() first
90ce3da70b43 Initial load
duke
parents:
diff changeset
   233
                // if finished first, wakeup others
90ce3da70b43 Initial load
duke
parents:
diff changeset
   234
                wakeup();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   235
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   236
            threadsToFinish--;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   237
            if (threadsToFinish == 0) // all helper threads finished poll().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   238
                notify();             // notify the main thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   239
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   240
90ce3da70b43 Initial load
duke
parents:
diff changeset
   241
        // The main thread invokes this function on finishLock to wait
90ce3da70b43 Initial load
duke
parents:
diff changeset
   242
        // for helper threads to finish poll().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   243
        private synchronized void waitForHelperThreads() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   244
            if (threadsToFinish == threads.size()) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   245
                // no helper threads finished yet. Wakeup them up.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   246
                wakeup();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   247
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   248
            while (threadsToFinish != 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   249
                try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   250
                    finishLock.wait();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   251
                } catch (InterruptedException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   252
                    // Interrupted - set interrupted state.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   253
                    Thread.currentThread().interrupt();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   254
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   255
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   256
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   257
90ce3da70b43 Initial load
duke
parents:
diff changeset
   258
        // sets IOException for this run
90ce3da70b43 Initial load
duke
parents:
diff changeset
   259
        private synchronized void setException(IOException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   260
            exception = e;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   261
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   262
90ce3da70b43 Initial load
duke
parents:
diff changeset
   263
        // Checks if there was any exception during the last run.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   264
        // If yes, throws it
90ce3da70b43 Initial load
duke
parents:
diff changeset
   265
        private void checkForException() throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   266
            if (exception == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   267
                return;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   268
            StringBuffer message =  new StringBuffer("An exception occured" +
90ce3da70b43 Initial load
duke
parents:
diff changeset
   269
                                       " during the execution of select(): \n");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   270
            message.append(exception);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   271
            message.append('\n');
90ce3da70b43 Initial load
duke
parents:
diff changeset
   272
            exception = null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   273
            throw new IOException(message.toString());
90ce3da70b43 Initial load
duke
parents:
diff changeset
   274
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   275
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   276
90ce3da70b43 Initial load
duke
parents:
diff changeset
   277
    private final class SubSelector {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   278
        private final int pollArrayIndex; // starting index in pollArray to poll
90ce3da70b43 Initial load
duke
parents:
diff changeset
   279
        // These arrays will hold result of native select().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   280
        // The first element of each array is the number of selected sockets.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   281
        // Other elements are file descriptors of selected sockets.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   282
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   283
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   284
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   285
90ce3da70b43 Initial load
duke
parents:
diff changeset
   286
        private SubSelector() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   287
            this.pollArrayIndex = 0; // main thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   288
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   289
90ce3da70b43 Initial load
duke
parents:
diff changeset
   290
        private SubSelector(int threadIndex) { // helper threads
90ce3da70b43 Initial load
duke
parents:
diff changeset
   291
            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   292
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   293
90ce3da70b43 Initial load
duke
parents:
diff changeset
   294
        private int poll() throws IOException{ // poll for the main thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   295
            return poll0(pollWrapper.pollArrayAddress,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   296
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
90ce3da70b43 Initial load
duke
parents:
diff changeset
   297
                         readFds, writeFds, exceptFds, timeout);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   298
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   299
90ce3da70b43 Initial load
duke
parents:
diff changeset
   300
        private int poll(int index) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   301
            // poll for helper threads
90ce3da70b43 Initial load
duke
parents:
diff changeset
   302
            return  poll0(pollWrapper.pollArrayAddress +
90ce3da70b43 Initial load
duke
parents:
diff changeset
   303
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
90ce3da70b43 Initial load
duke
parents:
diff changeset
   304
                     Math.min(MAX_SELECTABLE_FDS,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   305
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
90ce3da70b43 Initial load
duke
parents:
diff changeset
   306
                     readFds, writeFds, exceptFds, timeout);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   307
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   308
90ce3da70b43 Initial load
duke
parents:
diff changeset
   309
        private native int poll0(long pollAddress, int numfds,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   310
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   311
90ce3da70b43 Initial load
duke
parents:
diff changeset
   312
        private int processSelectedKeys(long updateCount) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   313
            int numKeysUpdated = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   314
            numKeysUpdated += processFDSet(updateCount, readFds,
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   315
                                           PollArrayWrapper.POLLIN,
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   316
                                           false);
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   317
            numKeysUpdated += processFDSet(updateCount, writeFds,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   318
                                           PollArrayWrapper.POLLCONN |
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   319
                                           PollArrayWrapper.POLLOUT,
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   320
                                           false);
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   321
            numKeysUpdated += processFDSet(updateCount, exceptFds,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   322
                                           PollArrayWrapper.POLLIN |
90ce3da70b43 Initial load
duke
parents:
diff changeset
   323
                                           PollArrayWrapper.POLLCONN |
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   324
                                           PollArrayWrapper.POLLOUT,
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   325
                                           true);
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   326
            return numKeysUpdated;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   327
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   328
90ce3da70b43 Initial load
duke
parents:
diff changeset
   329
        /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   330
         * Note, clearedCount is used to determine if the readyOps have
90ce3da70b43 Initial load
duke
parents:
diff changeset
   331
         * been reset in this select operation. updateCount is used to
90ce3da70b43 Initial load
duke
parents:
diff changeset
   332
         * tell if a key has been counted as updated in this select
90ce3da70b43 Initial load
duke
parents:
diff changeset
   333
         * operation.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   334
         *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   335
         * me.updateCount <= me.clearedCount <= updateCount
90ce3da70b43 Initial load
duke
parents:
diff changeset
   336
         */
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   337
        private int processFDSet(long updateCount, int[] fds, int rOps,
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   338
                                 boolean isExceptFds)
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   339
        {
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   340
            int numKeysUpdated = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   341
            for (int i = 1; i <= fds[0]; i++) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   342
                int desc = fds[i];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   343
                if (desc == wakeupSourceFd) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   344
                    synchronized (interruptLock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   345
                        interruptTriggered = true;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   346
                    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   347
                    continue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   348
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   349
                MapEntry me = fdMap.get(desc);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   350
                // If me is null, the key was deregistered in the previous
90ce3da70b43 Initial load
duke
parents:
diff changeset
   351
                // processDeregisterQueue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   352
                if (me == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   353
                    continue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   354
                SelectionKeyImpl sk = me.ski;
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   355
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   356
                // The descriptor may be in the exceptfds set because there is
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   357
                // OOB data queued to the socket. If there is OOB data then it
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   358
                // is discarded and the key is not added to the selected set.
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   359
                if (isExceptFds &&
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   360
                    (sk.channel() instanceof SocketChannelImpl) &&
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   361
                    discardUrgentData(desc))
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   362
                {
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   363
                    continue;
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   364
                }
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   365
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   366
                if (selectedKeys.contains(sk)) { // Key in selected set
90ce3da70b43 Initial load
duke
parents:
diff changeset
   367
                    if (me.clearedCount != updateCount) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   368
                        if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
90ce3da70b43 Initial load
duke
parents:
diff changeset
   369
                            (me.updateCount != updateCount)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   370
                            me.updateCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   371
                            numKeysUpdated++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   372
                        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   373
                    } else { // The readyOps have been set; now add
90ce3da70b43 Initial load
duke
parents:
diff changeset
   374
                        if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
90ce3da70b43 Initial load
duke
parents:
diff changeset
   375
                            (me.updateCount != updateCount)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   376
                            me.updateCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   377
                            numKeysUpdated++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   378
                        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   379
                    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   380
                    me.clearedCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   381
                } else { // Key is not in selected set yet
90ce3da70b43 Initial load
duke
parents:
diff changeset
   382
                    if (me.clearedCount != updateCount) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   383
                        sk.channel.translateAndSetReadyOps(rOps, sk);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   384
                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   385
                            selectedKeys.add(sk);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   386
                            me.updateCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   387
                            numKeysUpdated++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   388
                        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   389
                    } else { // The readyOps have been set; now add
90ce3da70b43 Initial load
duke
parents:
diff changeset
   390
                        sk.channel.translateAndUpdateReadyOps(rOps, sk);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   391
                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   392
                            selectedKeys.add(sk);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   393
                            me.updateCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   394
                            numKeysUpdated++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   395
                        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   396
                    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   397
                    me.clearedCount = updateCount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   398
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   399
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   400
            return numKeysUpdated;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   401
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   402
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   403
90ce3da70b43 Initial load
duke
parents:
diff changeset
   404
    // Represents a helper thread used for select.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   405
    private final class SelectThread extends Thread {
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   406
        private final int index; // index of this thread
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   407
        final SubSelector subSelector;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   408
        private long lastRun = 0; // last run number
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   409
        private volatile boolean zombie;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   410
        // Creates a new thread
90ce3da70b43 Initial load
duke
parents:
diff changeset
   411
        private SelectThread(int i) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   412
            this.index = i;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   413
            this.subSelector = new SubSelector(i);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   414
            //make sure we wait for next round of poll
90ce3da70b43 Initial load
duke
parents:
diff changeset
   415
            this.lastRun = startLock.runsCounter;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   416
        }
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   417
        void makeZombie() {
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   418
            zombie = true;
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   419
        }
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   420
        boolean isZombie() {
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   421
            return zombie;
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   422
        }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   423
        public void run() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   424
            while (true) { // poll loop
90ce3da70b43 Initial load
duke
parents:
diff changeset
   425
                // wait for the start of poll. If this thread has become
90ce3da70b43 Initial load
duke
parents:
diff changeset
   426
                // redundant, then exit.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   427
                if (startLock.waitForStart(this))
90ce3da70b43 Initial load
duke
parents:
diff changeset
   428
                    return;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   429
                // call poll()
90ce3da70b43 Initial load
duke
parents:
diff changeset
   430
                try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   431
                    subSelector.poll(index);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   432
                } catch (IOException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   433
                    // Save this exception and let other threads finish.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   434
                    finishLock.setException(e);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   435
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   436
                // notify main thread, that this thread has finished, and
90ce3da70b43 Initial load
duke
parents:
diff changeset
   437
                // wakeup others, if this thread is the first to finish.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   438
                finishLock.threadFinished();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   439
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   440
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   441
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   442
90ce3da70b43 Initial load
duke
parents:
diff changeset
   443
    // After some channels registered/deregistered, the number of required
90ce3da70b43 Initial load
duke
parents:
diff changeset
   444
    // helper threads may have changed. Adjust this number.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   445
    private void adjustThreadsCount() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   446
        if (threadsCount > threads.size()) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   447
            // More threads needed. Start more threads.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   448
            for (int i = threads.size(); i < threadsCount; i++) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   449
                SelectThread newThread = new SelectThread(i);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   450
                threads.add(newThread);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   451
                newThread.setDaemon(true);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   452
                newThread.start();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   453
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   454
        } else if (threadsCount < threads.size()) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   455
            // Some threads become redundant. Remove them from the threads List.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   456
            for (int i = threads.size() - 1 ; i >= threadsCount; i--)
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   457
                threads.remove(i).makeZombie();
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   458
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   459
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   460
90ce3da70b43 Initial load
duke
parents:
diff changeset
   461
    // Sets Windows wakeup socket to a signaled state.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   462
    private void setWakeupSocket() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   463
        setWakeupSocket0(wakeupSinkFd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   464
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   465
    private native void setWakeupSocket0(int wakeupSinkFd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   466
90ce3da70b43 Initial load
duke
parents:
diff changeset
   467
    // Sets Windows wakeup socket to a non-signaled state.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   468
    private void resetWakeupSocket() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   469
        synchronized (interruptLock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   470
            if (interruptTriggered == false)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   471
                return;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   472
            resetWakeupSocket0(wakeupSourceFd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   473
            interruptTriggered = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   474
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   475
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   476
90ce3da70b43 Initial load
duke
parents:
diff changeset
   477
    private native void resetWakeupSocket0(int wakeupSourceFd);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   478
5983
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   479
    private native boolean discardUrgentData(int fd);
b5bc332cd233 6213702: (so) non-blocking sockets with TCP urgent disabled get still selected for read ops (win)
alanb
parents: 5506
diff changeset
   480
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   481
    // We increment this counter on each call to updateSelectedKeys()
90ce3da70b43 Initial load
duke
parents:
diff changeset
   482
    // each entry in  SubSelector.fdsMap has a memorized value of
90ce3da70b43 Initial load
duke
parents:
diff changeset
   483
    // updateCount. When we increment numKeysUpdated we set updateCount
90ce3da70b43 Initial load
duke
parents:
diff changeset
   484
    // for the corresponding entry to its current value. This is used to
90ce3da70b43 Initial load
duke
parents:
diff changeset
   485
    // avoid counting the same key more than once - the same key can
90ce3da70b43 Initial load
duke
parents:
diff changeset
   486
    // appear in readfds and writefds.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   487
    private long updateCount = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   488
90ce3da70b43 Initial load
duke
parents:
diff changeset
   489
    // Update ops of the corresponding Channels. Add the ready keys to the
90ce3da70b43 Initial load
duke
parents:
diff changeset
   490
    // ready queue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   491
    private int updateSelectedKeys() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   492
        updateCount++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   493
        int numKeysUpdated = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   494
        numKeysUpdated += subSelector.processSelectedKeys(updateCount);
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   495
        for (SelectThread t: threads) {
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   496
            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   497
        }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   498
        return numKeysUpdated;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   499
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   500
90ce3da70b43 Initial load
duke
parents:
diff changeset
   501
    protected void implClose() throws IOException {
1449
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   502
        synchronized (closeLock) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   503
            if (channelArray != null) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   504
                if (pollWrapper != null) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   505
                    // prevent further wakeup
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   506
                    synchronized (interruptLock) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   507
                        interruptTriggered = true;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   508
                    }
1449
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   509
                    wakeupPipe.sink().close();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   510
                    wakeupPipe.source().close();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   511
                    for(int i = 1; i < totalChannels; i++) { // Deregister channels
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   512
                        if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   513
                            deregister(channelArray[i]);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   514
                            SelectableChannel selch = channelArray[i].channel();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   515
                            if (!selch.isOpen() && !selch.isRegistered())
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   516
                                ((SelChImpl)selch).kill();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   517
                        }
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   518
                    }
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   519
                    pollWrapper.free();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   520
                    pollWrapper = null;
2445
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   521
                    selectedKeys = null;
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   522
                    channelArray = null;
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   523
                    // Make all remaining helper threads exit
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   524
                    for (SelectThread t: threads)
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   525
                         t.makeZombie();
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   526
                    startLock.startThreads();
a1fa6863fc50 6823609: (se) Selector.select hangs on Windows under load
alanb
parents: 1639
diff changeset
   527
                }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   528
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   529
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   530
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   531
90ce3da70b43 Initial load
duke
parents:
diff changeset
   532
    protected void implRegister(SelectionKeyImpl ski) {
1449
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   533
        synchronized (closeLock) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   534
            if (pollWrapper == null)
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   535
                throw new ClosedSelectorException();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   536
            growIfNeeded();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   537
            channelArray[totalChannels] = ski;
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   538
            ski.setIndex(totalChannels);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   539
            fdMap.put(ski);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   540
            keys.add(ski);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   541
            pollWrapper.addEntry(totalChannels, ski);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   542
            totalChannels++;
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   543
        }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   544
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   545
90ce3da70b43 Initial load
duke
parents:
diff changeset
   546
    private void growIfNeeded() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   547
        if (channelArray.length == totalChannels) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   548
            int newSize = totalChannels * 2; // Make a larger array
90ce3da70b43 Initial load
duke
parents:
diff changeset
   549
            SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   550
            System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   551
            channelArray = temp;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   552
            pollWrapper.grow(newSize);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   553
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   554
        if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
90ce3da70b43 Initial load
duke
parents:
diff changeset
   555
            pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   556
            totalChannels++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   557
            threadsCount++;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   558
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   559
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   560
90ce3da70b43 Initial load
duke
parents:
diff changeset
   561
    protected void implDereg(SelectionKeyImpl ski) throws IOException{
90ce3da70b43 Initial load
duke
parents:
diff changeset
   562
        int i = ski.getIndex();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   563
        assert (i >= 0);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   564
        if (i != totalChannels - 1) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   565
            // Copy end one over it
90ce3da70b43 Initial load
duke
parents:
diff changeset
   566
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   567
            channelArray[i] = endChannel;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   568
            endChannel.setIndex(i);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   569
            pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   570
                                                                pollWrapper, i);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   571
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   572
        channelArray[totalChannels - 1] = null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   573
        totalChannels--;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   574
        ski.setIndex(-1);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   575
        if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   576
            totalChannels--;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   577
            threadsCount--; // The last thread has become redundant.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   578
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   579
        fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
90ce3da70b43 Initial load
duke
parents:
diff changeset
   580
        keys.remove(ski);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   581
        selectedKeys.remove(ski);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   582
        deregister(ski);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   583
        SelectableChannel selch = ski.channel();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   584
        if (!selch.isOpen() && !selch.isRegistered())
90ce3da70b43 Initial load
duke
parents:
diff changeset
   585
            ((SelChImpl)selch).kill();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   586
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   587
11823
ee83ae88512d 7041778: Move SCTP implementation out of sun.nio.ch and into its own package
chegar
parents: 7668
diff changeset
   588
    public void putEventOps(SelectionKeyImpl sk, int ops) {
1449
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   589
        synchronized (closeLock) {
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   590
            if (pollWrapper == null)
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   591
                throw new ClosedSelectorException();
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   592
            pollWrapper.putEventOps(sk.getIndex(), ops);
2ed6188288d6 5025260: Register methods should throw ClosedChannelException instead of NPE
sherman
parents: 1247
diff changeset
   593
        }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   594
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   595
90ce3da70b43 Initial load
duke
parents:
diff changeset
   596
    public Selector wakeup() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   597
        synchronized (interruptLock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   598
            if (!interruptTriggered) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   599
                setWakeupSocket();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   600
                interruptTriggered = true;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   601
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   602
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   603
        return this;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   604
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   605
90ce3da70b43 Initial load
duke
parents:
diff changeset
   606
    static {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   607
        Util.load();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   608
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   609
}