jdk/src/solaris/demo/jni/Poller/PollingServer.java
changeset 2 90ce3da70b43
child 5506 202f599c92aa
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * Copyright 1999-2001 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  *
       
     4  * Redistribution and use in source and binary forms, with or without
       
     5  * modification, are permitted provided that the following conditions
       
     6  * are met:
       
     7  *
       
     8  *   - Redistributions of source code must retain the above copyright
       
     9  *     notice, this list of conditions and the following disclaimer.
       
    10  *
       
    11  *   - Redistributions in binary form must reproduce the above copyright
       
    12  *     notice, this list of conditions and the following disclaimer in the
       
    13  *     documentation and/or other materials provided with the distribution.
       
    14  *
       
    15  *   - Neither the name of Sun Microsystems nor the names of its
       
    16  *     contributors may be used to endorse or promote products derived
       
    17  *     from this software without specific prior written permission.
       
    18  *
       
    19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
       
    20  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
       
    21  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
       
    22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
       
    23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
       
    24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
       
    25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
       
    26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
       
    27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
       
    28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
       
    29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
       
    30  */
       
    31 
       
    32 import java.io.*;
       
    33 import java.net.*;
       
    34 import java.lang.Byte;
       
    35 
       
    36 /**
       
    37  * Simple Java "server" using the Poller class
       
    38  * to multiplex on incoming connections.  Note
       
    39  * that handoff of events, via linked Q is not
       
    40  * actually be a performance booster here, since
       
    41  * the processing of events is cheaper than
       
    42  * the overhead in scheduling/executing them.
       
    43  * Although this demo does allow for concurrency
       
    44  * in handling connections, it uses a rather
       
    45  * primitive "gang scheduling" policy to keep
       
    46  * the code simpler.
       
    47  */
       
    48 
       
    49 public class PollingServer
       
    50 {
       
    51   public final static int MAXCONN    = 10000;
       
    52   public final static int PORTNUM    = 4444;
       
    53   public final static int BYTESPEROP = 10;
       
    54 
       
    55   /**
       
    56    * This synchronization object protects access to certain
       
    57    * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
       
    58    */
       
    59   private final static Object eventSync = new Object();
       
    60 
       
    61   private static InputStream[] instr = new InputStream[MAXCONN];
       
    62   private static int[] mapping = new int[65535];
       
    63   private static LinkedQueue linkedQ = new LinkedQueue();
       
    64   private static int bytesRead = 0;
       
    65   private static int bytesToRead;
       
    66   private static int eventsToProcess=0;
       
    67 
       
    68   public PollingServer(int concurrency) {
       
    69     Socket[] sockArr = new Socket[MAXCONN];
       
    70     long timestart, timestop;
       
    71     short[] revents = new short[MAXCONN];
       
    72     int[] fds = new int[MAXCONN];
       
    73     int bytes;
       
    74     Poller Mux;
       
    75     int serverFd;
       
    76     int totalConn=0;
       
    77     int connects=0;
       
    78 
       
    79     System.out.println ("Serv: Initializing port " + PORTNUM);
       
    80     try {
       
    81 
       
    82       ServerSocket skMain = new ServerSocket (PORTNUM);
       
    83       /*
       
    84        * Create the Poller object Mux, allow for up to MAXCONN
       
    85        * sockets/filedescriptors to be polled.
       
    86        */
       
    87       Mux = new Poller(MAXCONN);
       
    88       serverFd = Mux.add(skMain, Poller.POLLIN);
       
    89 
       
    90       Socket ctrlSock = skMain.accept();
       
    91 
       
    92       BufferedReader ctrlReader =
       
    93         new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
       
    94       String ctrlString = ctrlReader.readLine();
       
    95       bytesToRead = Integer.valueOf(ctrlString).intValue();
       
    96       ctrlString = ctrlReader.readLine();
       
    97       totalConn = Integer.valueOf(ctrlString).intValue();
       
    98 
       
    99       System.out.println("Receiving " + bytesToRead + " bytes from " +
       
   100                          totalConn + " client connections");
       
   101 
       
   102       timestart = System.currentTimeMillis();
       
   103 
       
   104       /*
       
   105        * Start the consumer threads to read data.
       
   106        */
       
   107       for (int consumerThread = 0;
       
   108            consumerThread < concurrency; consumerThread++ ) {
       
   109         new Consumer(consumerThread).start();
       
   110       }
       
   111 
       
   112       /*
       
   113        * Take connections, read Data
       
   114        */
       
   115       int numEvents=0;
       
   116 
       
   117       while ( bytesRead < bytesToRead ) {
       
   118 
       
   119         int loopWaits=0;
       
   120         while (eventsToProcess > 0) {
       
   121           synchronized (eventSync) {
       
   122             loopWaits++;
       
   123             if (eventsToProcess <= 0) break;
       
   124             try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
       
   125           }
       
   126         }
       
   127         if (loopWaits > 1)
       
   128           System.out.println("Done waiting...loops = " + loopWaits +
       
   129                              " events " + numEvents +
       
   130                              " bytes read : " + bytesRead );
       
   131 
       
   132         if (bytesRead >= bytesToRead) break; // may be done!
       
   133 
       
   134         /*
       
   135          * Wait for events
       
   136          */
       
   137         numEvents = Mux.waitMultiple(100, fds, revents);
       
   138         synchronized (eventSync) {
       
   139           eventsToProcess = numEvents;
       
   140         }
       
   141         /*
       
   142          * Process all the events we got from Mux.waitMultiple
       
   143          */
       
   144         int cnt = 0;
       
   145         while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
       
   146           int fd = fds[cnt];
       
   147 
       
   148           if (revents[cnt] == Poller.POLLIN) {
       
   149             if (fd == serverFd) {
       
   150               /*
       
   151                * New connection coming in on the ServerSocket
       
   152                * Add the socket to the Mux, keep track of mapping
       
   153                * the fdval returned by Mux.add to the connection.
       
   154                */
       
   155               sockArr[connects] = skMain.accept();
       
   156               instr[connects] = sockArr[connects].getInputStream();
       
   157               int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
       
   158               mapping[fdval] = connects;
       
   159               synchronized(eventSync) {
       
   160                 eventsToProcess--; // just processed this one!
       
   161               }
       
   162               connects++;
       
   163             } else {
       
   164               /*
       
   165                * We've got data from this client connection.
       
   166                * Put it on the queue for the consumer threads to process.
       
   167                */
       
   168               linkedQ.put(new Integer(fd));
       
   169             }
       
   170           } else {
       
   171             System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
       
   172           }
       
   173           cnt++;
       
   174         }
       
   175       }
       
   176       timestop = System.currentTimeMillis();
       
   177       System.out.println("Time for all reads (" + totalConn +
       
   178                          " sockets) : " + (timestop-timestart));
       
   179 
       
   180       // Tell the client it can now go away
       
   181       byte[] buff = new byte[BYTESPEROP];
       
   182       ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
       
   183 
       
   184       // Tell the cunsumer threads they can exit.
       
   185       for (int cThread = 0; cThread < concurrency; cThread++ ) {
       
   186         linkedQ.put(new Integer(-1));
       
   187       }
       
   188     } catch (Exception exc) { exc.printStackTrace(); }
       
   189   }
       
   190 
       
   191   /*
       
   192    * main ... just check if a concurrency was specified
       
   193    */
       
   194   public static void main (String args[])
       
   195   {
       
   196     int concurrency;
       
   197 
       
   198     if (args.length == 1)
       
   199       concurrency = java.lang.Integer.valueOf(args[0]).intValue();
       
   200     else
       
   201       concurrency = Poller.getNumCPUs() + 1;
       
   202     PollingServer server = new PollingServer(concurrency);
       
   203   }
       
   204 
       
   205   /*
       
   206    * This class is for handling the Client data.
       
   207    * The PollingServer spawns off a number of these based upon
       
   208    * the number of CPUs (or concurrency argument).
       
   209    * Each just loops grabbing events off the queue and
       
   210    * processing them.
       
   211    */
       
   212   class Consumer extends Thread {
       
   213     private int threadNumber;
       
   214     public Consumer(int i) { threadNumber = i; }
       
   215 
       
   216     public void run() {
       
   217       byte[] buff = new byte[BYTESPEROP];
       
   218       int bytes = 0;
       
   219 
       
   220       InputStream instream;
       
   221       while (bytesRead < bytesToRead) {
       
   222         try {
       
   223           Integer Fd = (Integer) linkedQ.take();
       
   224           int fd = Fd.intValue();
       
   225           if (fd == -1) break; /* got told we could exit */
       
   226 
       
   227           /*
       
   228            * We have to map the fd value returned from waitMultiple
       
   229            * to the actual input stream associated with that fd.
       
   230            * Take a look at how the Mux.add() was done to see how
       
   231            * we stored that.
       
   232            */
       
   233           int map = mapping[fd];
       
   234           instream = instr[map];
       
   235           bytes = instream.read(buff,0,BYTESPEROP);
       
   236         } catch (Exception e) { System.out.println(e.toString()); }
       
   237 
       
   238         if (bytes > 0) {
       
   239           /*
       
   240            * Any real server would do some synchronized and some
       
   241            * unsynchronized work on behalf of the client, and
       
   242            * most likely send some data back...but this is a
       
   243            * gross oversimplification.
       
   244            */
       
   245           synchronized(eventSync) {
       
   246             bytesRead += bytes;
       
   247             eventsToProcess--;
       
   248             if (eventsToProcess <= 0) {
       
   249               eventSync.notify();
       
   250             }
       
   251           }
       
   252         }
       
   253       }
       
   254     }
       
   255   }
       
   256 }