74 } |
74 } |
75 }; |
75 }; |
76 } |
76 } |
77 |
77 |
78 public static void main(String[] args) throws Exception { |
78 public static void main(String[] args) throws Exception { |
79 // create listener to accept connections |
|
80 final AsynchronousServerSocketChannel listener = |
|
81 AsynchronousServerSocketChannel.open() |
|
82 .bind(new InetSocketAddress(0)); |
|
83 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { |
|
84 public void completed(final AsynchronousSocketChannel ch, Void att) { |
|
85 listener.accept((Void)null, this); |
|
86 final ByteBuffer buf = ByteBuffer.allocate(100); |
|
87 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { |
|
88 public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { |
|
89 if (bytesRead < 0) { |
|
90 try { ch.close(); } catch (IOException ignore) { } |
|
91 } else { |
|
92 buf.clear(); |
|
93 ch.read(buf, ch, this); |
|
94 } |
|
95 } |
|
96 public void failed(Throwable exc, AsynchronousSocketChannel ch) { |
|
97 try { ch.close(); } catch (IOException ignore) { } |
|
98 } |
|
99 }); |
|
100 } |
|
101 public void failed(Throwable exc, Void att) { |
|
102 } |
|
103 }); |
|
104 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); |
|
105 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); |
|
106 |
|
107 // create 3-10 channels, each in its own group |
79 // create 3-10 channels, each in its own group |
108 final int groupCount = 3 + rand.nextInt(8); |
80 final int groupCount = 3 + rand.nextInt(8); |
109 AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount]; |
81 final AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount]; |
110 final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount]; |
82 final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount]; |
111 for (int i=0; i<groupCount; i++) { |
83 |
112 ThreadFactory factory = createThreadFactory(i); |
84 // create listener to accept connections |
113 AsynchronousChannelGroup group; |
85 try (final AsynchronousServerSocketChannel listener = |
114 if (rand.nextBoolean()) { |
86 AsynchronousServerSocketChannel.open()) { |
115 int nThreads = 1 + rand.nextInt(10); |
87 |
116 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); |
88 listener.bind(new InetSocketAddress(0)); |
117 } else { |
89 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { |
118 ExecutorService pool = Executors.newCachedThreadPool(factory); |
90 public void completed(final AsynchronousSocketChannel ch, Void att) { |
119 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); |
91 listener.accept((Void)null, this); |
|
92 final ByteBuffer buf = ByteBuffer.allocate(100); |
|
93 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { |
|
94 public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { |
|
95 if (bytesRead < 0) { |
|
96 try { ch.close(); } catch (IOException ignore) { } |
|
97 } else { |
|
98 buf.clear(); |
|
99 ch.read(buf, ch, this); |
|
100 } |
|
101 } |
|
102 public void failed(Throwable exc, AsynchronousSocketChannel ch) { |
|
103 try { ch.close(); } catch (IOException ignore) { } |
|
104 } |
|
105 }); |
|
106 } |
|
107 public void failed(Throwable exc, Void att) { |
|
108 } |
|
109 }); |
|
110 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); |
|
111 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); |
|
112 |
|
113 for (int i=0; i<groupCount; i++) { |
|
114 ThreadFactory factory = createThreadFactory(i); |
|
115 AsynchronousChannelGroup group; |
|
116 if (rand.nextBoolean()) { |
|
117 int nThreads = 1 + rand.nextInt(10); |
|
118 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); |
|
119 } else { |
|
120 ExecutorService pool = Executors.newCachedThreadPool(factory); |
|
121 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); |
|
122 } |
|
123 groups[i] = group; |
|
124 |
|
125 // create channel in group and connect it to the server |
|
126 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); |
|
127 ch.connect(sa).get(); |
|
128 channels[i] = ch; |
120 } |
129 } |
121 groups[i] = group; |
|
122 |
130 |
123 // create channel in group and connect it to the server |
131 // randomly write to each channel, ensuring that the completion handler |
124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); |
132 // is always invoked by a thread with the right identity. |
125 ch.connect(sa).get(); |
133 final AtomicInteger writeCount = new AtomicInteger(100); |
126 channels[i] = ch; |
134 channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { |
|
135 public void completed(Integer bytesWritten, Integer groupId) { |
|
136 if (bytesWritten != 1) |
|
137 fail("Expected 1 byte to be written"); |
|
138 if (!myGroup.get().equals(groupId)) |
|
139 fail("Handler invoked by thread with the wrong identity"); |
|
140 if (writeCount.decrementAndGet() > 0) { |
|
141 int id = rand.nextInt(groupCount); |
|
142 channels[id].write(getBuffer(), id, this); |
|
143 } else { |
|
144 done.countDown(); |
|
145 } |
|
146 } |
|
147 public void failed(Throwable exc, Integer groupId) { |
|
148 fail(exc.getMessage()); |
|
149 } |
|
150 }); |
|
151 |
|
152 // wait until done |
|
153 done.await(); |
|
154 } finally { |
|
155 // clean-up |
|
156 for (AsynchronousSocketChannel ch: channels) |
|
157 ch.close(); |
|
158 for (AsynchronousChannelGroup group: groups) |
|
159 group.shutdownNow(); |
|
160 |
|
161 if (failed.get()) |
|
162 throw new RuntimeException("Test failed - see log for details"); |
127 } |
163 } |
128 |
|
129 // randomly write to each channel, ensuring that the completion handler |
|
130 // is always invoked by a thread with the right identity. |
|
131 final AtomicInteger writeCount = new AtomicInteger(100); |
|
132 channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { |
|
133 public void completed(Integer bytesWritten, Integer groupId) { |
|
134 if (bytesWritten != 1) |
|
135 fail("Expected 1 byte to be written"); |
|
136 if (!myGroup.get().equals(groupId)) |
|
137 fail("Handler invoked by thread with the wrong identity"); |
|
138 if (writeCount.decrementAndGet() > 0) { |
|
139 int id = rand.nextInt(groupCount); |
|
140 channels[id].write(getBuffer(), id, this); |
|
141 } else { |
|
142 done.countDown(); |
|
143 } |
|
144 } |
|
145 public void failed(Throwable exc, Integer groupId) { |
|
146 fail(exc.getMessage()); |
|
147 } |
|
148 }); |
|
149 |
|
150 // wait until done |
|
151 done.await(); |
|
152 |
|
153 // clean-up |
|
154 for (AsynchronousSocketChannel ch: channels) |
|
155 ch.close(); |
|
156 for (AsynchronousChannelGroup group: groups) |
|
157 group.shutdownNow(); |
|
158 listener.close(); |
|
159 |
|
160 if (failed.get()) |
|
161 throw new RuntimeException("Test failed - see log for details"); |
|
162 } |
164 } |
163 |
165 |
164 static ByteBuffer getBuffer() { |
166 static ByteBuffer getBuffer() { |
165 ByteBuffer buf; |
167 ByteBuffer buf; |
166 if (rand.nextBoolean()) { |
168 if (rand.nextBoolean()) { |