|
1 /* |
|
2 * Copyright 1999-2001 Sun Microsystems, Inc. All Rights Reserved. |
|
3 * |
|
4 * Redistribution and use in source and binary forms, with or without |
|
5 * modification, are permitted provided that the following conditions |
|
6 * are met: |
|
7 * |
|
8 * - Redistributions of source code must retain the above copyright |
|
9 * notice, this list of conditions and the following disclaimer. |
|
10 * |
|
11 * - Redistributions in binary form must reproduce the above copyright |
|
12 * notice, this list of conditions and the following disclaimer in the |
|
13 * documentation and/or other materials provided with the distribution. |
|
14 * |
|
15 * - Neither the name of Sun Microsystems nor the names of its |
|
16 * contributors may be used to endorse or promote products derived |
|
17 * from this software without specific prior written permission. |
|
18 * |
|
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
|
20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
|
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
|
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
|
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
|
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
|
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
|
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
|
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
30 */ |
|
31 |
|
32 import java.io.*; |
|
33 import java.net.*; |
|
34 import java.lang.Byte; |
|
35 |
|
36 /** |
|
37 * Simple Java "server" using the Poller class |
|
38 * to multiplex on incoming connections. Note |
|
39 * that handoff of events, via linked Q is not |
|
40 * actually be a performance booster here, since |
|
41 * the processing of events is cheaper than |
|
42 * the overhead in scheduling/executing them. |
|
43 * Although this demo does allow for concurrency |
|
44 * in handling connections, it uses a rather |
|
45 * primitive "gang scheduling" policy to keep |
|
46 * the code simpler. |
|
47 */ |
|
48 |
|
49 public class PollingServer |
|
50 { |
|
51 public final static int MAXCONN = 10000; |
|
52 public final static int PORTNUM = 4444; |
|
53 public final static int BYTESPEROP = 10; |
|
54 |
|
55 /** |
|
56 * This synchronization object protects access to certain |
|
57 * data (bytesRead,eventsToProcess) by concurrent Consumer threads. |
|
58 */ |
|
59 private final static Object eventSync = new Object(); |
|
60 |
|
61 private static InputStream[] instr = new InputStream[MAXCONN]; |
|
62 private static int[] mapping = new int[65535]; |
|
63 private static LinkedQueue linkedQ = new LinkedQueue(); |
|
64 private static int bytesRead = 0; |
|
65 private static int bytesToRead; |
|
66 private static int eventsToProcess=0; |
|
67 |
|
68 public PollingServer(int concurrency) { |
|
69 Socket[] sockArr = new Socket[MAXCONN]; |
|
70 long timestart, timestop; |
|
71 short[] revents = new short[MAXCONN]; |
|
72 int[] fds = new int[MAXCONN]; |
|
73 int bytes; |
|
74 Poller Mux; |
|
75 int serverFd; |
|
76 int totalConn=0; |
|
77 int connects=0; |
|
78 |
|
79 System.out.println ("Serv: Initializing port " + PORTNUM); |
|
80 try { |
|
81 |
|
82 ServerSocket skMain = new ServerSocket (PORTNUM); |
|
83 /* |
|
84 * Create the Poller object Mux, allow for up to MAXCONN |
|
85 * sockets/filedescriptors to be polled. |
|
86 */ |
|
87 Mux = new Poller(MAXCONN); |
|
88 serverFd = Mux.add(skMain, Poller.POLLIN); |
|
89 |
|
90 Socket ctrlSock = skMain.accept(); |
|
91 |
|
92 BufferedReader ctrlReader = |
|
93 new BufferedReader(new InputStreamReader(ctrlSock.getInputStream())); |
|
94 String ctrlString = ctrlReader.readLine(); |
|
95 bytesToRead = Integer.valueOf(ctrlString).intValue(); |
|
96 ctrlString = ctrlReader.readLine(); |
|
97 totalConn = Integer.valueOf(ctrlString).intValue(); |
|
98 |
|
99 System.out.println("Receiving " + bytesToRead + " bytes from " + |
|
100 totalConn + " client connections"); |
|
101 |
|
102 timestart = System.currentTimeMillis(); |
|
103 |
|
104 /* |
|
105 * Start the consumer threads to read data. |
|
106 */ |
|
107 for (int consumerThread = 0; |
|
108 consumerThread < concurrency; consumerThread++ ) { |
|
109 new Consumer(consumerThread).start(); |
|
110 } |
|
111 |
|
112 /* |
|
113 * Take connections, read Data |
|
114 */ |
|
115 int numEvents=0; |
|
116 |
|
117 while ( bytesRead < bytesToRead ) { |
|
118 |
|
119 int loopWaits=0; |
|
120 while (eventsToProcess > 0) { |
|
121 synchronized (eventSync) { |
|
122 loopWaits++; |
|
123 if (eventsToProcess <= 0) break; |
|
124 try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();}; |
|
125 } |
|
126 } |
|
127 if (loopWaits > 1) |
|
128 System.out.println("Done waiting...loops = " + loopWaits + |
|
129 " events " + numEvents + |
|
130 " bytes read : " + bytesRead ); |
|
131 |
|
132 if (bytesRead >= bytesToRead) break; // may be done! |
|
133 |
|
134 /* |
|
135 * Wait for events |
|
136 */ |
|
137 numEvents = Mux.waitMultiple(100, fds, revents); |
|
138 synchronized (eventSync) { |
|
139 eventsToProcess = numEvents; |
|
140 } |
|
141 /* |
|
142 * Process all the events we got from Mux.waitMultiple |
|
143 */ |
|
144 int cnt = 0; |
|
145 while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) { |
|
146 int fd = fds[cnt]; |
|
147 |
|
148 if (revents[cnt] == Poller.POLLIN) { |
|
149 if (fd == serverFd) { |
|
150 /* |
|
151 * New connection coming in on the ServerSocket |
|
152 * Add the socket to the Mux, keep track of mapping |
|
153 * the fdval returned by Mux.add to the connection. |
|
154 */ |
|
155 sockArr[connects] = skMain.accept(); |
|
156 instr[connects] = sockArr[connects].getInputStream(); |
|
157 int fdval = Mux.add(sockArr[connects], Poller.POLLIN); |
|
158 mapping[fdval] = connects; |
|
159 synchronized(eventSync) { |
|
160 eventsToProcess--; // just processed this one! |
|
161 } |
|
162 connects++; |
|
163 } else { |
|
164 /* |
|
165 * We've got data from this client connection. |
|
166 * Put it on the queue for the consumer threads to process. |
|
167 */ |
|
168 linkedQ.put(new Integer(fd)); |
|
169 } |
|
170 } else { |
|
171 System.out.println("Got revents[" + cnt + "] == " + revents[cnt]); |
|
172 } |
|
173 cnt++; |
|
174 } |
|
175 } |
|
176 timestop = System.currentTimeMillis(); |
|
177 System.out.println("Time for all reads (" + totalConn + |
|
178 " sockets) : " + (timestop-timestart)); |
|
179 |
|
180 // Tell the client it can now go away |
|
181 byte[] buff = new byte[BYTESPEROP]; |
|
182 ctrlSock.getOutputStream().write(buff,0,BYTESPEROP); |
|
183 |
|
184 // Tell the cunsumer threads they can exit. |
|
185 for (int cThread = 0; cThread < concurrency; cThread++ ) { |
|
186 linkedQ.put(new Integer(-1)); |
|
187 } |
|
188 } catch (Exception exc) { exc.printStackTrace(); } |
|
189 } |
|
190 |
|
191 /* |
|
192 * main ... just check if a concurrency was specified |
|
193 */ |
|
194 public static void main (String args[]) |
|
195 { |
|
196 int concurrency; |
|
197 |
|
198 if (args.length == 1) |
|
199 concurrency = java.lang.Integer.valueOf(args[0]).intValue(); |
|
200 else |
|
201 concurrency = Poller.getNumCPUs() + 1; |
|
202 PollingServer server = new PollingServer(concurrency); |
|
203 } |
|
204 |
|
205 /* |
|
206 * This class is for handling the Client data. |
|
207 * The PollingServer spawns off a number of these based upon |
|
208 * the number of CPUs (or concurrency argument). |
|
209 * Each just loops grabbing events off the queue and |
|
210 * processing them. |
|
211 */ |
|
212 class Consumer extends Thread { |
|
213 private int threadNumber; |
|
214 public Consumer(int i) { threadNumber = i; } |
|
215 |
|
216 public void run() { |
|
217 byte[] buff = new byte[BYTESPEROP]; |
|
218 int bytes = 0; |
|
219 |
|
220 InputStream instream; |
|
221 while (bytesRead < bytesToRead) { |
|
222 try { |
|
223 Integer Fd = (Integer) linkedQ.take(); |
|
224 int fd = Fd.intValue(); |
|
225 if (fd == -1) break; /* got told we could exit */ |
|
226 |
|
227 /* |
|
228 * We have to map the fd value returned from waitMultiple |
|
229 * to the actual input stream associated with that fd. |
|
230 * Take a look at how the Mux.add() was done to see how |
|
231 * we stored that. |
|
232 */ |
|
233 int map = mapping[fd]; |
|
234 instream = instr[map]; |
|
235 bytes = instream.read(buff,0,BYTESPEROP); |
|
236 } catch (Exception e) { System.out.println(e.toString()); } |
|
237 |
|
238 if (bytes > 0) { |
|
239 /* |
|
240 * Any real server would do some synchronized and some |
|
241 * unsynchronized work on behalf of the client, and |
|
242 * most likely send some data back...but this is a |
|
243 * gross oversimplification. |
|
244 */ |
|
245 synchronized(eventSync) { |
|
246 bytesRead += bytes; |
|
247 eventsToProcess--; |
|
248 if (eventsToProcess <= 0) { |
|
249 eventSync.notify(); |
|
250 } |
|
251 } |
|
252 } |
|
253 } |
|
254 } |
|
255 } |
|
256 } |