79 AsynchronousServerSocketChannel.open() |
80 AsynchronousServerSocketChannel.open() |
80 .bind(new InetSocketAddress(0)); |
81 .bind(new InetSocketAddress(0)); |
81 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { |
82 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { |
82 public void completed(final AsynchronousSocketChannel ch, Void att) { |
83 public void completed(final AsynchronousSocketChannel ch, Void att) { |
83 listener.accept((Void)null, this); |
84 listener.accept((Void)null, this); |
84 |
|
85 final ByteBuffer buf = ByteBuffer.allocate(100); |
85 final ByteBuffer buf = ByteBuffer.allocate(100); |
86 ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() { |
86 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { |
87 public void completed(Integer bytesRead, Void att) { |
87 public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { |
88 buf.clear(); |
88 if (bytesRead < 0) { |
89 ch.read(buf, (Void)null, this); |
89 try { ch.close(); } catch (IOException ignore) { } |
|
90 } else { |
|
91 buf.clear(); |
|
92 ch.read(buf, ch, this); |
|
93 } |
90 } |
94 } |
91 public void failed(Throwable exc, Void att) { |
95 public void failed(Throwable exc, AsynchronousSocketChannel ch) { |
|
96 try { ch.close(); } catch (IOException ignore) { } |
92 } |
97 } |
93 }); |
98 }); |
94 } |
99 } |
95 public void failed(Throwable exc, Void att) { |
100 public void failed(Throwable exc, Void att) { |
96 } |
101 } |
98 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); |
103 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); |
99 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); |
104 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); |
100 |
105 |
101 // create 3-10 channels, each in its own group |
106 // create 3-10 channels, each in its own group |
102 final int groupCount = 3 + rand.nextInt(8); |
107 final int groupCount = 3 + rand.nextInt(8); |
103 final AsynchronousSocketChannel[] channel = new AsynchronousSocketChannel[groupCount]; |
108 AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount]; |
|
109 final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount]; |
104 for (int i=0; i<groupCount; i++) { |
110 for (int i=0; i<groupCount; i++) { |
105 ThreadFactory factory = createThreadFactory(i); |
111 ThreadFactory factory = createThreadFactory(i); |
106 AsynchronousChannelGroup group; |
112 AsynchronousChannelGroup group; |
107 if (rand.nextBoolean()) { |
113 if (rand.nextBoolean()) { |
108 int nThreads = 1 + rand.nextInt(10); |
114 int nThreads = 1 + rand.nextInt(10); |
109 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); |
115 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); |
110 } else { |
116 } else { |
111 ExecutorService pool = Executors.newCachedThreadPool(factory); |
117 ExecutorService pool = Executors.newCachedThreadPool(factory); |
112 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); |
118 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); |
113 } |
119 } |
|
120 groups[i] = group; |
114 |
121 |
115 // create channel in group and connect it to the server |
122 // create channel in group and connect it to the server |
116 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); |
123 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); |
117 ch.connect(sa).get(); |
124 ch.connect(sa).get(); |
118 channel[i] = ch; |
125 channels[i] = ch; |
119 } |
126 } |
120 |
127 |
121 // randomly write to each channel, ensuring that the completion handler |
128 // randomly write to each channel, ensuring that the completion handler |
122 // is always invoked by a thread with the right identity. |
129 // is always invoked by a thread with the right identity. |
123 final AtomicInteger writeCount = new AtomicInteger(100); |
130 final AtomicInteger writeCount = new AtomicInteger(100); |
124 channel[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { |
131 channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { |
125 public void completed(Integer bytesWritten, Integer groupId) { |
132 public void completed(Integer bytesWritten, Integer groupId) { |
126 if (bytesWritten != 1) |
133 if (bytesWritten != 1) |
127 fail("Expected 1 byte to be written"); |
134 fail("Expected 1 byte to be written"); |
128 if (!myGroup.get().equals(groupId)) |
135 if (!myGroup.get().equals(groupId)) |
129 fail("Handler invoked by thread with the wrong identity"); |
136 fail("Handler invoked by thread with the wrong identity"); |
130 if (writeCount.decrementAndGet() > 0) { |
137 if (writeCount.decrementAndGet() > 0) { |
131 int id = rand.nextInt(groupCount); |
138 int id = rand.nextInt(groupCount); |
132 channel[id].write(getBuffer(), id, this); |
139 channels[id].write(getBuffer(), id, this); |
133 } else { |
140 } else { |
134 done.countDown(); |
141 done.countDown(); |
135 } |
142 } |
136 } |
143 } |
137 public void failed(Throwable exc, Integer groupId) { |
144 public void failed(Throwable exc, Integer groupId) { |
138 fail(exc.getMessage()); |
145 fail(exc.getMessage()); |
139 } |
146 } |
140 }); |
147 }); |
141 |
148 |
142 // wait until |
149 // wait until done |
143 done.await(); |
150 done.await(); |
|
151 |
|
152 // clean-up |
|
153 for (AsynchronousSocketChannel ch: channels) |
|
154 ch.close(); |
|
155 for (AsynchronousChannelGroup group: groups) |
|
156 group.shutdownNow(); |
|
157 listener.close(); |
|
158 |
144 if (failed.get()) |
159 if (failed.get()) |
145 throw new RuntimeException("Test failed - see log for details"); |
160 throw new RuntimeException("Test failed - see log for details"); |
146 } |
161 } |
147 |
162 |
148 static ByteBuffer getBuffer() { |
163 static ByteBuffer getBuffer() { |