jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java
changeset 5970 d4e98bbfb0be
parent 5506 202f599c92aa
child 7668 d4a77089c587
equal deleted inserted replaced
5968:4e9dd25279c7 5970:d4e98bbfb0be
    30 import java.nio.channels.*;
    30 import java.nio.channels.*;
    31 import java.net.*;
    31 import java.net.*;
    32 import java.util.*;
    32 import java.util.*;
    33 import java.util.concurrent.*;
    33 import java.util.concurrent.*;
    34 import java.util.concurrent.atomic.*;
    34 import java.util.concurrent.atomic.*;
       
    35 import java.io.IOException;
    35 
    36 
    36 /**
    37 /**
    37  * Tests that the completion handler is invoked by a thread with
    38  * Tests that the completion handler is invoked by a thread with
    38  * the expected identity.
    39  * the expected identity.
    39  */
    40  */
    79             AsynchronousServerSocketChannel.open()
    80             AsynchronousServerSocketChannel.open()
    80                 .bind(new InetSocketAddress(0));
    81                 .bind(new InetSocketAddress(0));
    81         listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
    82         listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
    82             public void completed(final AsynchronousSocketChannel ch, Void att) {
    83             public void completed(final AsynchronousSocketChannel ch, Void att) {
    83                 listener.accept((Void)null, this);
    84                 listener.accept((Void)null, this);
    84 
       
    85                 final ByteBuffer buf = ByteBuffer.allocate(100);
    85                 final ByteBuffer buf = ByteBuffer.allocate(100);
    86                 ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() {
    86                 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
    87                     public void completed(Integer bytesRead, Void att) {
    87                     public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
    88                         buf.clear();
    88                         if (bytesRead < 0) {
    89                         ch.read(buf, (Void)null, this);
    89                             try { ch.close(); } catch (IOException ignore) { }
       
    90                         } else {
       
    91                             buf.clear();
       
    92                             ch.read(buf, ch, this);
       
    93                         }
    90                     }
    94                     }
    91                     public void failed(Throwable exc, Void att) {
    95                     public void failed(Throwable exc, AsynchronousSocketChannel ch) {
       
    96                         try { ch.close(); } catch (IOException ignore) { }
    92                     }
    97                     }
    93                 });
    98                 });
    94             }
    99             }
    95             public void failed(Throwable exc, Void att) {
   100             public void failed(Throwable exc, Void att) {
    96             }
   101             }
    98         int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
   103         int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
    99         SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
   104         SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
   100 
   105 
   101         // create 3-10 channels, each in its own group
   106         // create 3-10 channels, each in its own group
   102         final int groupCount = 3 + rand.nextInt(8);
   107         final int groupCount = 3 + rand.nextInt(8);
   103         final AsynchronousSocketChannel[] channel = new AsynchronousSocketChannel[groupCount];
   108         AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
       
   109         final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
   104         for (int i=0; i<groupCount; i++) {
   110         for (int i=0; i<groupCount; i++) {
   105             ThreadFactory factory = createThreadFactory(i);
   111             ThreadFactory factory = createThreadFactory(i);
   106             AsynchronousChannelGroup group;
   112             AsynchronousChannelGroup group;
   107             if (rand.nextBoolean()) {
   113             if (rand.nextBoolean()) {
   108                 int nThreads = 1 + rand.nextInt(10);
   114                 int nThreads = 1 + rand.nextInt(10);
   109                 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
   115                 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
   110             } else {
   116             } else {
   111                 ExecutorService pool = Executors.newCachedThreadPool(factory);
   117                 ExecutorService pool = Executors.newCachedThreadPool(factory);
   112                 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
   118                 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
   113             }
   119             }
       
   120             groups[i] = group;
   114 
   121 
   115             // create channel in group and connect it to the server
   122             // create channel in group and connect it to the server
   116             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
   123             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
   117             ch.connect(sa).get();
   124             ch.connect(sa).get();
   118             channel[i] = ch;
   125             channels[i] = ch;
   119         }
   126         }
   120 
   127 
   121         // randomly write to each channel, ensuring that the completion handler
   128         // randomly write to each channel, ensuring that the completion handler
   122         // is always invoked by a thread with the right identity.
   129         // is always invoked by a thread with the right identity.
   123         final AtomicInteger writeCount = new AtomicInteger(100);
   130         final AtomicInteger writeCount = new AtomicInteger(100);
   124         channel[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
   131         channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
   125             public void completed(Integer bytesWritten, Integer groupId) {
   132             public void completed(Integer bytesWritten, Integer groupId) {
   126                 if (bytesWritten != 1)
   133                 if (bytesWritten != 1)
   127                     fail("Expected 1 byte to be written");
   134                     fail("Expected 1 byte to be written");
   128                 if (!myGroup.get().equals(groupId))
   135                 if (!myGroup.get().equals(groupId))
   129                     fail("Handler invoked by thread with the wrong identity");
   136                     fail("Handler invoked by thread with the wrong identity");
   130                 if (writeCount.decrementAndGet() > 0) {
   137                 if (writeCount.decrementAndGet() > 0) {
   131                     int id = rand.nextInt(groupCount);
   138                     int id = rand.nextInt(groupCount);
   132                     channel[id].write(getBuffer(), id, this);
   139                     channels[id].write(getBuffer(), id, this);
   133                 } else {
   140                 } else {
   134                     done.countDown();
   141                     done.countDown();
   135                 }
   142                 }
   136             }
   143             }
   137             public void failed(Throwable exc, Integer groupId) {
   144             public void failed(Throwable exc, Integer groupId) {
   138                 fail(exc.getMessage());
   145                 fail(exc.getMessage());
   139             }
   146             }
   140         });
   147         });
   141 
   148 
   142         // wait until
   149         // wait until done
   143         done.await();
   150         done.await();
       
   151 
       
   152         // clean-up
       
   153         for (AsynchronousSocketChannel ch: channels)
       
   154             ch.close();
       
   155         for (AsynchronousChannelGroup group: groups)
       
   156             group.shutdownNow();
       
   157         listener.close();
       
   158 
   144         if (failed.get())
   159         if (failed.get())
   145             throw new RuntimeException("Test failed - see log for details");
   160             throw new RuntimeException("Test failed - see log for details");
   146     }
   161     }
   147 
   162 
   148     static ByteBuffer getBuffer() {
   163     static ByteBuffer getBuffer() {