jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java
changeset 40545 11e43161b14e
parent 30046 cf2c86e1819e
equal deleted inserted replaced
40544:807dd9a425db 40545:11e43161b14e
     1 /*
     1 /*
     2  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     7  * published by the Free Software Foundation.
    74             }
    74             }
    75         };
    75         };
    76     }
    76     }
    77 
    77 
    78     public static void main(String[] args) throws Exception {
    78     public static void main(String[] args) throws Exception {
    79         // create listener to accept connections
       
    80         final AsynchronousServerSocketChannel listener =
       
    81             AsynchronousServerSocketChannel.open()
       
    82                 .bind(new InetSocketAddress(0));
       
    83         listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
       
    84             public void completed(final AsynchronousSocketChannel ch, Void att) {
       
    85                 listener.accept((Void)null, this);
       
    86                 final ByteBuffer buf = ByteBuffer.allocate(100);
       
    87                 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
       
    88                     public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
       
    89                         if (bytesRead < 0) {
       
    90                             try { ch.close(); } catch (IOException ignore) { }
       
    91                         } else {
       
    92                             buf.clear();
       
    93                             ch.read(buf, ch, this);
       
    94                         }
       
    95                     }
       
    96                     public void failed(Throwable exc, AsynchronousSocketChannel ch) {
       
    97                         try { ch.close(); } catch (IOException ignore) { }
       
    98                     }
       
    99                 });
       
   100             }
       
   101             public void failed(Throwable exc, Void att) {
       
   102             }
       
   103         });
       
   104         int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
       
   105         SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
       
   106 
       
   107         // create 3-10 channels, each in its own group
    79         // create 3-10 channels, each in its own group
   108         final int groupCount = 3 + rand.nextInt(8);
    80         final int groupCount = 3 + rand.nextInt(8);
   109         AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
    81         final AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
   110         final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
    82         final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
   111         for (int i=0; i<groupCount; i++) {
    83 
   112             ThreadFactory factory = createThreadFactory(i);
    84         // create listener to accept connections
   113             AsynchronousChannelGroup group;
    85         try (final AsynchronousServerSocketChannel listener =
   114             if (rand.nextBoolean()) {
    86                 AsynchronousServerSocketChannel.open()) {
   115                 int nThreads = 1 + rand.nextInt(10);
    87 
   116                 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
    88             listener.bind(new InetSocketAddress(0));
   117             } else {
    89             listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
   118                 ExecutorService pool = Executors.newCachedThreadPool(factory);
    90                 public void completed(final AsynchronousSocketChannel ch, Void att) {
   119                 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
    91                     listener.accept((Void)null, this);
       
    92                     final ByteBuffer buf = ByteBuffer.allocate(100);
       
    93                     ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
       
    94                         public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
       
    95                             if (bytesRead < 0) {
       
    96                                 try { ch.close(); } catch (IOException ignore) { }
       
    97                             } else {
       
    98                                 buf.clear();
       
    99                                 ch.read(buf, ch, this);
       
   100                             }
       
   101                         }
       
   102                         public void failed(Throwable exc, AsynchronousSocketChannel ch) {
       
   103                             try { ch.close(); } catch (IOException ignore) { }
       
   104                         }
       
   105                     });
       
   106                 }
       
   107                 public void failed(Throwable exc, Void att) {
       
   108                 }
       
   109             });
       
   110             int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
       
   111             SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
       
   112 
       
   113             for (int i=0; i<groupCount; i++) {
       
   114                 ThreadFactory factory = createThreadFactory(i);
       
   115                 AsynchronousChannelGroup group;
       
   116                 if (rand.nextBoolean()) {
       
   117                     int nThreads = 1 + rand.nextInt(10);
       
   118                     group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
       
   119                 } else {
       
   120                     ExecutorService pool = Executors.newCachedThreadPool(factory);
       
   121                     group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
       
   122                 }
       
   123                 groups[i] = group;
       
   124 
       
   125                 // create channel in group and connect it to the server
       
   126                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
       
   127                 ch.connect(sa).get();
       
   128                 channels[i] = ch;
   120             }
   129             }
   121             groups[i] = group;
       
   122 
   130 
   123             // create channel in group and connect it to the server
   131             // randomly write to each channel, ensuring that the completion handler
   124             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
   132             // is always invoked by a thread with the right identity.
   125             ch.connect(sa).get();
   133             final AtomicInteger writeCount = new AtomicInteger(100);
   126             channels[i] = ch;
   134             channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
       
   135                 public void completed(Integer bytesWritten, Integer groupId) {
       
   136                     if (bytesWritten != 1)
       
   137                         fail("Expected 1 byte to be written");
       
   138                     if (!myGroup.get().equals(groupId))
       
   139                         fail("Handler invoked by thread with the wrong identity");
       
   140                     if (writeCount.decrementAndGet() > 0) {
       
   141                         int id = rand.nextInt(groupCount);
       
   142                         channels[id].write(getBuffer(), id, this);
       
   143                     } else {
       
   144                         done.countDown();
       
   145                     }
       
   146                 }
       
   147                 public void failed(Throwable exc, Integer groupId) {
       
   148                     fail(exc.getMessage());
       
   149                 }
       
   150             });
       
   151 
       
   152             // wait until done
       
   153             done.await();
       
   154         } finally {
       
   155             // clean-up
       
   156             for (AsynchronousSocketChannel ch: channels)
       
   157                 ch.close();
       
   158             for (AsynchronousChannelGroup group: groups)
       
   159                 group.shutdownNow();
       
   160 
       
   161             if (failed.get())
       
   162                 throw new RuntimeException("Test failed - see log for details");
   127         }
   163         }
   128 
       
   129         // randomly write to each channel, ensuring that the completion handler
       
   130         // is always invoked by a thread with the right identity.
       
   131         final AtomicInteger writeCount = new AtomicInteger(100);
       
   132         channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
       
   133             public void completed(Integer bytesWritten, Integer groupId) {
       
   134                 if (bytesWritten != 1)
       
   135                     fail("Expected 1 byte to be written");
       
   136                 if (!myGroup.get().equals(groupId))
       
   137                     fail("Handler invoked by thread with the wrong identity");
       
   138                 if (writeCount.decrementAndGet() > 0) {
       
   139                     int id = rand.nextInt(groupCount);
       
   140                     channels[id].write(getBuffer(), id, this);
       
   141                 } else {
       
   142                     done.countDown();
       
   143                 }
       
   144             }
       
   145             public void failed(Throwable exc, Integer groupId) {
       
   146                 fail(exc.getMessage());
       
   147             }
       
   148         });
       
   149 
       
   150         // wait until done
       
   151         done.await();
       
   152 
       
   153         // clean-up
       
   154         for (AsynchronousSocketChannel ch: channels)
       
   155             ch.close();
       
   156         for (AsynchronousChannelGroup group: groups)
       
   157             group.shutdownNow();
       
   158         listener.close();
       
   159 
       
   160         if (failed.get())
       
   161             throw new RuntimeException("Test failed - see log for details");
       
   162     }
   164     }
   163 
   165 
   164     static ByteBuffer getBuffer() {
   166     static ByteBuffer getBuffer() {
   165         ByteBuffer buf;
   167         ByteBuffer buf;
   166         if (rand.nextBoolean()) {
   168         if (rand.nextBoolean()) {