--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/solaris/demo/jni/Poller/PollingServer.java Sat Dec 01 00:00:00 2007 +0000
@@ -0,0 +1,256 @@
+/*
+ * Copyright 1999-2001 Sun Microsystems, Inc. All Rights Reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Sun Microsystems nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.*;
+import java.net.*;
+import java.lang.Byte;
+
+/**
+ * Simple Java "server" using the Poller class
+ * to multiplex on incoming connections. Note
+ * that handoff of events, via linked Q is not
+ * actually be a performance booster here, since
+ * the processing of events is cheaper than
+ * the overhead in scheduling/executing them.
+ * Although this demo does allow for concurrency
+ * in handling connections, it uses a rather
+ * primitive "gang scheduling" policy to keep
+ * the code simpler.
+ */
+
+public class PollingServer
+{
+ public final static int MAXCONN = 10000;
+ public final static int PORTNUM = 4444;
+ public final static int BYTESPEROP = 10;
+
+ /**
+ * This synchronization object protects access to certain
+ * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
+ */
+ private final static Object eventSync = new Object();
+
+ private static InputStream[] instr = new InputStream[MAXCONN];
+ private static int[] mapping = new int[65535];
+ private static LinkedQueue linkedQ = new LinkedQueue();
+ private static int bytesRead = 0;
+ private static int bytesToRead;
+ private static int eventsToProcess=0;
+
+ public PollingServer(int concurrency) {
+ Socket[] sockArr = new Socket[MAXCONN];
+ long timestart, timestop;
+ short[] revents = new short[MAXCONN];
+ int[] fds = new int[MAXCONN];
+ int bytes;
+ Poller Mux;
+ int serverFd;
+ int totalConn=0;
+ int connects=0;
+
+ System.out.println ("Serv: Initializing port " + PORTNUM);
+ try {
+
+ ServerSocket skMain = new ServerSocket (PORTNUM);
+ /*
+ * Create the Poller object Mux, allow for up to MAXCONN
+ * sockets/filedescriptors to be polled.
+ */
+ Mux = new Poller(MAXCONN);
+ serverFd = Mux.add(skMain, Poller.POLLIN);
+
+ Socket ctrlSock = skMain.accept();
+
+ BufferedReader ctrlReader =
+ new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
+ String ctrlString = ctrlReader.readLine();
+ bytesToRead = Integer.valueOf(ctrlString).intValue();
+ ctrlString = ctrlReader.readLine();
+ totalConn = Integer.valueOf(ctrlString).intValue();
+
+ System.out.println("Receiving " + bytesToRead + " bytes from " +
+ totalConn + " client connections");
+
+ timestart = System.currentTimeMillis();
+
+ /*
+ * Start the consumer threads to read data.
+ */
+ for (int consumerThread = 0;
+ consumerThread < concurrency; consumerThread++ ) {
+ new Consumer(consumerThread).start();
+ }
+
+ /*
+ * Take connections, read Data
+ */
+ int numEvents=0;
+
+ while ( bytesRead < bytesToRead ) {
+
+ int loopWaits=0;
+ while (eventsToProcess > 0) {
+ synchronized (eventSync) {
+ loopWaits++;
+ if (eventsToProcess <= 0) break;
+ try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
+ }
+ }
+ if (loopWaits > 1)
+ System.out.println("Done waiting...loops = " + loopWaits +
+ " events " + numEvents +
+ " bytes read : " + bytesRead );
+
+ if (bytesRead >= bytesToRead) break; // may be done!
+
+ /*
+ * Wait for events
+ */
+ numEvents = Mux.waitMultiple(100, fds, revents);
+ synchronized (eventSync) {
+ eventsToProcess = numEvents;
+ }
+ /*
+ * Process all the events we got from Mux.waitMultiple
+ */
+ int cnt = 0;
+ while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
+ int fd = fds[cnt];
+
+ if (revents[cnt] == Poller.POLLIN) {
+ if (fd == serverFd) {
+ /*
+ * New connection coming in on the ServerSocket
+ * Add the socket to the Mux, keep track of mapping
+ * the fdval returned by Mux.add to the connection.
+ */
+ sockArr[connects] = skMain.accept();
+ instr[connects] = sockArr[connects].getInputStream();
+ int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
+ mapping[fdval] = connects;
+ synchronized(eventSync) {
+ eventsToProcess--; // just processed this one!
+ }
+ connects++;
+ } else {
+ /*
+ * We've got data from this client connection.
+ * Put it on the queue for the consumer threads to process.
+ */
+ linkedQ.put(new Integer(fd));
+ }
+ } else {
+ System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
+ }
+ cnt++;
+ }
+ }
+ timestop = System.currentTimeMillis();
+ System.out.println("Time for all reads (" + totalConn +
+ " sockets) : " + (timestop-timestart));
+
+ // Tell the client it can now go away
+ byte[] buff = new byte[BYTESPEROP];
+ ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
+
+ // Tell the cunsumer threads they can exit.
+ for (int cThread = 0; cThread < concurrency; cThread++ ) {
+ linkedQ.put(new Integer(-1));
+ }
+ } catch (Exception exc) { exc.printStackTrace(); }
+ }
+
+ /*
+ * main ... just check if a concurrency was specified
+ */
+ public static void main (String args[])
+ {
+ int concurrency;
+
+ if (args.length == 1)
+ concurrency = java.lang.Integer.valueOf(args[0]).intValue();
+ else
+ concurrency = Poller.getNumCPUs() + 1;
+ PollingServer server = new PollingServer(concurrency);
+ }
+
+ /*
+ * This class is for handling the Client data.
+ * The PollingServer spawns off a number of these based upon
+ * the number of CPUs (or concurrency argument).
+ * Each just loops grabbing events off the queue and
+ * processing them.
+ */
+ class Consumer extends Thread {
+ private int threadNumber;
+ public Consumer(int i) { threadNumber = i; }
+
+ public void run() {
+ byte[] buff = new byte[BYTESPEROP];
+ int bytes = 0;
+
+ InputStream instream;
+ while (bytesRead < bytesToRead) {
+ try {
+ Integer Fd = (Integer) linkedQ.take();
+ int fd = Fd.intValue();
+ if (fd == -1) break; /* got told we could exit */
+
+ /*
+ * We have to map the fd value returned from waitMultiple
+ * to the actual input stream associated with that fd.
+ * Take a look at how the Mux.add() was done to see how
+ * we stored that.
+ */
+ int map = mapping[fd];
+ instream = instr[map];
+ bytes = instream.read(buff,0,BYTESPEROP);
+ } catch (Exception e) { System.out.println(e.toString()); }
+
+ if (bytes > 0) {
+ /*
+ * Any real server would do some synchronized and some
+ * unsynchronized work on behalf of the client, and
+ * most likely send some data back...but this is a
+ * gross oversimplification.
+ */
+ synchronized(eventSync) {
+ bytesRead += bytes;
+ eventsToProcess--;
+ if (eventsToProcess <= 0) {
+ eventSync.notify();
+ }
+ }
+ }
+ }
+ }
+ }
+}