--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Tue Jun 04 21:37:15 2013 -0700
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Wed Jun 05 11:12:31 2013 +0100
@@ -43,47 +43,24 @@
static volatile boolean finished;
public static void main(String[] args) throws Exception {
- // all accepted connections are added to a queue
- final ArrayBlockingQueue<AsynchronousSocketChannel> queue =
- new ArrayBlockingQueue<AsynchronousSocketChannel>(CONCURRENCY_COUNT);
-
// create listener to accept connections
- final AsynchronousServerSocketChannel listener =
+ AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()
.bind(new InetSocketAddress(0));
- listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
- public void completed(AsynchronousSocketChannel ch, Void att) {
- queue.add(ch);
- listener.accept((Void)null, this);
- }
- public void failed(Throwable exc, Void att) {
- if (!finished) {
- failed = true;
- System.err.println("accept failed: " + exc);
- }
- }
- });
- System.out.println("Listener created.");
+
+ // establish connections
- // establish lots of connections
+ AsynchronousSocketChannel[] clients = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+ AsynchronousSocketChannel[] peers = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
- AsynchronousSocketChannel[] channels =
- new AsynchronousSocketChannel[CONCURRENCY_COUNT];
+
for (int i=0; i<CONCURRENCY_COUNT; i++) {
- int attempts = 0;
- for (;;) {
- try {
- channels[i] = AsynchronousSocketChannel.open();
- channels[i].connect(sa).get();
- break;
- } catch (IOException x) {
- // probably resource issue so back off and retry
- if (++attempts >= 3)
- throw x;
- Thread.sleep(50);
- }
- }
+ clients[i] = AsynchronousSocketChannel.open();
+ Future<Void> result = clients[i].connect(sa);
+ peers[i] = listener.accept().get();
+ result.get();
}
System.out.println("All connection established.");
@@ -91,9 +68,9 @@
final CyclicBarrier barrier = new CyclicBarrier(CONCURRENCY_COUNT+1);
// initiate a read operation on each channel.
- for (int i=0; i<CONCURRENCY_COUNT; i++) {
+ for (AsynchronousSocketChannel client: clients) {
ByteBuffer buf = ByteBuffer.allocateDirect(100);
- channels[i].read( buf, channels[i],
+ client.read(buf, client,
new CompletionHandler<Integer,AsynchronousSocketChannel>() {
public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
try {
@@ -113,13 +90,10 @@
System.out.println("All read operations outstanding.");
// write data to each of the accepted connections
- int remaining = CONCURRENCY_COUNT;
- while (remaining > 0) {
- AsynchronousSocketChannel ch = queue.take();
- ch.write(ByteBuffer.wrap("welcome".getBytes())).get();
- ch.shutdownOutput();
- ch.close();
- remaining--;
+ for (AsynchronousSocketChannel peer: peers) {
+ peer.write(ByteBuffer.wrap("welcome".getBytes())).get();
+ peer.shutdownOutput();
+ peer.close();
}
// wait for all threads to reach the barrier