jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java
changeset 5970 d4e98bbfb0be
parent 5506 202f599c92aa
child 7668 d4a77089c587
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java	Tue Jun 22 19:18:06 2010 -0700
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java	Wed Jun 23 20:19:29 2010 +0100
@@ -32,6 +32,7 @@
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.io.IOException;
 
 /**
  * Tests that the completion handler is invoked by a thread with
@@ -81,14 +82,18 @@
         listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
             public void completed(final AsynchronousSocketChannel ch, Void att) {
                 listener.accept((Void)null, this);
-
                 final ByteBuffer buf = ByteBuffer.allocate(100);
-                ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() {
-                    public void completed(Integer bytesRead, Void att) {
-                        buf.clear();
-                        ch.read(buf, (Void)null, this);
+                ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
+                    public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
+                        if (bytesRead < 0) {
+                            try { ch.close(); } catch (IOException ignore) { }
+                        } else {
+                            buf.clear();
+                            ch.read(buf, ch, this);
+                        }
                     }
-                    public void failed(Throwable exc, Void att) {
+                    public void failed(Throwable exc, AsynchronousSocketChannel ch) {
+                        try { ch.close(); } catch (IOException ignore) { }
                     }
                 });
             }
@@ -100,7 +105,8 @@
 
         // create 3-10 channels, each in its own group
         final int groupCount = 3 + rand.nextInt(8);
-        final AsynchronousSocketChannel[] channel = new AsynchronousSocketChannel[groupCount];
+        AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
+        final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
         for (int i=0; i<groupCount; i++) {
             ThreadFactory factory = createThreadFactory(i);
             AsynchronousChannelGroup group;
@@ -111,17 +117,18 @@
                 ExecutorService pool = Executors.newCachedThreadPool(factory);
                 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
             }
+            groups[i] = group;
 
             // create channel in group and connect it to the server
             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
             ch.connect(sa).get();
-            channel[i] = ch;
+            channels[i] = ch;
         }
 
         // randomly write to each channel, ensuring that the completion handler
         // is always invoked by a thread with the right identity.
         final AtomicInteger writeCount = new AtomicInteger(100);
-        channel[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
+        channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
             public void completed(Integer bytesWritten, Integer groupId) {
                 if (bytesWritten != 1)
                     fail("Expected 1 byte to be written");
@@ -129,7 +136,7 @@
                     fail("Handler invoked by thread with the wrong identity");
                 if (writeCount.decrementAndGet() > 0) {
                     int id = rand.nextInt(groupCount);
-                    channel[id].write(getBuffer(), id, this);
+                    channels[id].write(getBuffer(), id, this);
                 } else {
                     done.countDown();
                 }
@@ -139,8 +146,16 @@
             }
         });
 
-        // wait until
+        // wait until done
         done.await();
+
+        // clean-up
+        for (AsynchronousSocketChannel ch: channels)
+            ch.close();
+        for (AsynchronousChannelGroup group: groups)
+            group.shutdownNow();
+        listener.close();
+
         if (failed.get())
             throw new RuntimeException("Test failed - see log for details");
     }