--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Tue Jun 22 19:18:06 2010 -0700
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Wed Jun 23 20:19:29 2010 +0100
@@ -32,6 +32,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.io.IOException;
/**
* Tests that the completion handler is invoked by a thread with
@@ -81,14 +82,18 @@
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(final AsynchronousSocketChannel ch, Void att) {
listener.accept((Void)null, this);
-
final ByteBuffer buf = ByteBuffer.allocate(100);
- ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() {
- public void completed(Integer bytesRead, Void att) {
- buf.clear();
- ch.read(buf, (Void)null, this);
+ ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
+ public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
+ if (bytesRead < 0) {
+ try { ch.close(); } catch (IOException ignore) { }
+ } else {
+ buf.clear();
+ ch.read(buf, ch, this);
+ }
}
- public void failed(Throwable exc, Void att) {
+ public void failed(Throwable exc, AsynchronousSocketChannel ch) {
+ try { ch.close(); } catch (IOException ignore) { }
}
});
}
@@ -100,7 +105,8 @@
// create 3-10 channels, each in its own group
final int groupCount = 3 + rand.nextInt(8);
- final AsynchronousSocketChannel[] channel = new AsynchronousSocketChannel[groupCount];
+ AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
+ final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
for (int i=0; i<groupCount; i++) {
ThreadFactory factory = createThreadFactory(i);
AsynchronousChannelGroup group;
@@ -111,17 +117,18 @@
ExecutorService pool = Executors.newCachedThreadPool(factory);
group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
}
+ groups[i] = group;
// create channel in group and connect it to the server
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
ch.connect(sa).get();
- channel[i] = ch;
+ channels[i] = ch;
}
// randomly write to each channel, ensuring that the completion handler
// is always invoked by a thread with the right identity.
final AtomicInteger writeCount = new AtomicInteger(100);
- channel[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
+ channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
public void completed(Integer bytesWritten, Integer groupId) {
if (bytesWritten != 1)
fail("Expected 1 byte to be written");
@@ -129,7 +136,7 @@
fail("Handler invoked by thread with the wrong identity");
if (writeCount.decrementAndGet() > 0) {
int id = rand.nextInt(groupCount);
- channel[id].write(getBuffer(), id, this);
+ channels[id].write(getBuffer(), id, this);
} else {
done.countDown();
}
@@ -139,8 +146,16 @@
}
});
- // wait until
+ // wait until done
done.await();
+
+ // clean-up
+ for (AsynchronousSocketChannel ch: channels)
+ ch.close();
+ for (AsynchronousChannelGroup group: groups)
+ group.shutdownNow();
+ listener.close();
+
if (failed.get())
throw new RuntimeException("Test failed - see log for details");
}