jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java
changeset 17948 de14cee4b93d
parent 14505 1a758bd5b548
child 23010 6dadb192ad81
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java	Tue Jun 04 21:37:15 2013 -0700
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java	Wed Jun 05 11:12:31 2013 +0100
@@ -43,47 +43,24 @@
     static volatile boolean finished;
 
     public static void main(String[] args) throws Exception {
-        // all accepted connections are added to a queue
-        final ArrayBlockingQueue<AsynchronousSocketChannel> queue =
-            new ArrayBlockingQueue<AsynchronousSocketChannel>(CONCURRENCY_COUNT);
-
         // create listener to accept connections
-        final AsynchronousServerSocketChannel listener =
+        AsynchronousServerSocketChannel listener =
             AsynchronousServerSocketChannel.open()
                 .bind(new InetSocketAddress(0));
-        listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
-            public void completed(AsynchronousSocketChannel ch, Void att) {
-                queue.add(ch);
-                listener.accept((Void)null, this);
-            }
-            public void failed(Throwable exc, Void att) {
-                if (!finished) {
-                    failed = true;
-                    System.err.println("accept failed: " + exc);
-                }
-            }
-        });
-        System.out.println("Listener created.");
+
+        // establish connections
 
-        // establish lots of connections
+        AsynchronousSocketChannel[] clients = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+        AsynchronousSocketChannel[] peers = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+
         int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
         SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
-        AsynchronousSocketChannel[] channels =
-            new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+
         for (int i=0; i<CONCURRENCY_COUNT; i++) {
-            int attempts = 0;
-            for (;;) {
-                try {
-                    channels[i] = AsynchronousSocketChannel.open();
-                    channels[i].connect(sa).get();
-                    break;
-                } catch (IOException x) {
-                    // probably resource issue so back off and retry
-                    if (++attempts >= 3)
-                        throw x;
-                    Thread.sleep(50);
-                }
-            }
+            clients[i] = AsynchronousSocketChannel.open();
+            Future<Void> result = clients[i].connect(sa);
+            peers[i] = listener.accept().get();
+            result.get();
         }
         System.out.println("All connection established.");
 
@@ -91,9 +68,9 @@
         final CyclicBarrier barrier = new CyclicBarrier(CONCURRENCY_COUNT+1);
 
         // initiate a read operation on each channel.
-        for (int i=0; i<CONCURRENCY_COUNT; i++) {
+        for (AsynchronousSocketChannel client: clients) {
             ByteBuffer buf = ByteBuffer.allocateDirect(100);
-            channels[i].read( buf, channels[i],
+            client.read(buf, client,
                 new CompletionHandler<Integer,AsynchronousSocketChannel>() {
                     public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
                         try {
@@ -113,13 +90,10 @@
         System.out.println("All read operations outstanding.");
 
         // write data to each of the accepted connections
-        int remaining = CONCURRENCY_COUNT;
-        while (remaining > 0) {
-            AsynchronousSocketChannel ch = queue.take();
-            ch.write(ByteBuffer.wrap("welcome".getBytes())).get();
-            ch.shutdownOutput();
-            ch.close();
-            remaining--;
+        for (AsynchronousSocketChannel peer: peers) {
+            peer.write(ByteBuffer.wrap("welcome".getBytes())).get();
+            peer.shutdownOutput();
+            peer.close();
         }
 
         // wait for all threads to reach the barrier