77 static class Server implements Closeable { |
77 static class Server implements Closeable { |
78 private final ServerSocketChannel ssc; |
78 private final ServerSocketChannel ssc; |
79 private final InetSocketAddress address; |
79 private final InetSocketAddress address; |
80 |
80 |
81 Server() throws IOException { |
81 Server() throws IOException { |
82 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); |
82 this(0); |
83 |
83 } |
84 InetAddress lh = InetAddress.getLocalHost(); |
84 |
85 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); |
85 Server(int recvBufSize) throws IOException { |
86 address = new InetSocketAddress(lh, port); |
86 ssc = ServerSocketChannel.open(); |
|
87 if (recvBufSize > 0) { |
|
88 ssc.setOption(SO_RCVBUF, recvBufSize); |
|
89 } |
|
90 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); |
|
91 address = (InetSocketAddress)ssc.getLocalAddress(); |
87 } |
92 } |
88 |
93 |
89 InetSocketAddress address() { |
94 InetSocketAddress address() { |
90 return address; |
95 return address; |
91 } |
96 } |
323 |
328 |
324 System.out.println("-- asynchronous close when writing --"); |
329 System.out.println("-- asynchronous close when writing --"); |
325 |
330 |
326 ch = AsynchronousSocketChannel.open(); |
331 ch = AsynchronousSocketChannel.open(); |
327 ch.connect(server.address()).get(); |
332 ch.connect(server.address()).get(); |
|
333 SocketChannel peer = server.accept(); |
|
334 peer.setOption(SO_RCVBUF, 1); |
328 |
335 |
329 final AtomicReference<Throwable> writeException = |
336 final AtomicReference<Throwable> writeException = |
330 new AtomicReference<Throwable>(); |
337 new AtomicReference<Throwable>(); |
331 |
338 |
332 // write bytes to fill socket buffer |
339 // write bytes to fill socket buffer |
333 final AtomicInteger numCompleted = new AtomicInteger(); |
340 final AtomicInteger numCompleted = new AtomicInteger(); |
334 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { |
341 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { |
335 public void completed(Integer result, AsynchronousSocketChannel ch) { |
342 public void completed(Integer result, AsynchronousSocketChannel ch) { |
|
343 System.out.println("completed write to async channel: " + result); |
336 numCompleted.incrementAndGet(); |
344 numCompleted.incrementAndGet(); |
337 ch.write(genBuffer(), ch, this); |
345 ch.write(genBuffer(), ch, this); |
|
346 System.out.println("started another write to async channel: " + result); |
338 } |
347 } |
339 public void failed(Throwable x, AsynchronousSocketChannel ch) { |
348 public void failed(Throwable x, AsynchronousSocketChannel ch) { |
|
349 System.out.println("failed write to async channel"); |
340 writeException.set(x); |
350 writeException.set(x); |
341 } |
351 } |
342 }); |
352 }); |
343 |
353 |
344 // give time for socket buffer to fill up - |
354 // give time for socket buffer to fill up - |
345 // take pauses until the handler is no longer being invoked |
355 // take pauses until the handler is no longer being invoked |
346 // because all writes are being pended which guarantees that |
356 // because all writes are being pended which guarantees that |
347 // the internal channel state indicates it is writing |
357 // the internal channel state indicates it is writing |
348 int prevNumCompleted = numCompleted.get(); |
358 int prevNumCompleted = numCompleted.get(); |
349 do { |
359 do { |
350 Thread.sleep(1000); |
360 Thread.sleep((long)(1000 * jdk.test.lib.Utils.TIMEOUT_FACTOR)); |
|
361 System.out.println("check if buffer is filled up"); |
351 if (numCompleted.get() == prevNumCompleted) { |
362 if (numCompleted.get() == prevNumCompleted) { |
352 break; |
363 break; |
353 } |
364 } |
354 prevNumCompleted = numCompleted.get(); |
365 prevNumCompleted = numCompleted.get(); |
355 } while (true); |
366 } while (true); |
356 |
367 |
357 // attempt a concurrent write - |
368 // attempt a concurrent write - |
358 // should fail with WritePendingException |
369 // should fail with WritePendingException |
359 try { |
370 try { |
|
371 System.out.println("concurrent write to async channel"); |
360 ch.write(genBuffer()); |
372 ch.write(genBuffer()); |
|
373 System.out.format("prevNumCompleted: %d, numCompleted: %d%n", |
|
374 prevNumCompleted, numCompleted.get()); |
361 throw new RuntimeException("WritePendingException expected"); |
375 throw new RuntimeException("WritePendingException expected"); |
362 } catch (WritePendingException x) { |
376 } catch (WritePendingException x) { |
363 } |
377 } |
364 |
378 |
365 // close channel - should cause initial write to complete |
379 // close channel - should cause initial write to complete |
|
380 System.out.println("closing async channel..."); |
366 ch.close(); |
381 ch.close(); |
367 server.accept().close(); |
382 System.out.println("closed async channel"); |
|
383 peer.close(); |
368 |
384 |
369 // wait for exception |
385 // wait for exception |
370 while (writeException.get() == null) { |
386 while (writeException.get() == null) { |
371 Thread.sleep(100); |
387 Thread.sleep(100); |
372 } |
388 } |